Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private AbstractMatchingHashJoin(
private void downstreamPush(Row left, Row right) throws Exception {
requested--;

downstream().push(outRowFactory.apply(left, right));;
downstream().push(outRowFactory.apply(left, right));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProxyImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryProperties;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
Expand Down Expand Up @@ -200,9 +201,15 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
this.tuples = new ArrayList<>(MODIFY_BATCH_SIZE);

GridCacheContext<Object, Object> cctx = desc.cacheContext();
GridCacheProxyImpl<Object, Object> cache = cctx.cache().keepBinary();
IgniteInternalCache<Object, Object> cache = cctx.cache();
GridNearTxLocal tx = Commons.queryTransaction(context(), cctx.shared());

QueryProperties props = context().unwrap(QueryProperties.class);
boolean keepBinaryMode = props == null || props.keepBinary();

if (keepBinaryMode)
cache = cache.keepBinary();

if (tx == null)
invokeOutsideTransaction(tuples, cache);
else
Expand All @@ -217,7 +224,7 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
*/
private void invokeOutsideTransaction(
List<ModifyTuple> tuples,
GridCacheProxyImpl<Object, Object> cache
IgniteInternalCache<Object, Object> cache
) throws IgniteCheckedException {
SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class);
Map<String, String> sesAttrs = sesCtx == null ? null : sesCtx.attributes();
Expand Down Expand Up @@ -251,7 +258,7 @@ private void invokeOutsideTransaction(
*/
private void invokeInsideTransaction(
List<ModifyTuple> tuples,
GridCacheProxyImpl<Object, Object> cache,
IgniteInternalCache<Object, Object> cache,
GridNearTxLocal userTx
) throws IgniteCheckedException {
userTx.resume();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class CancelTest extends GridCommonAbstractTest {
.setKeyFieldName("id")
.setValueFieldName("val")
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("val", String.class.getName(), null);;
.addQueryField("val", String.class.getName(), null);

return super.getConfiguration(igniteInstanceName)
.setCacheConfiguration(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.integration;

import java.util.Collection;
import java.util.List;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheInterceptorAdapter;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;

/** Cache interceptor related tests. */
@RunWith(Parameterized.class)
public class CacheWithInterceptorIntegrationTest extends GridCommonAbstractTest {
/** Node role. */
@Parameterized.Parameter(0)
public boolean keepBinary;

/** */
@Parameterized.Parameters(name = "keepBinary={0}")
public static Collection<?> parameters() {
return List.of(true, false);
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids(true);
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
var entity0 = new QueryEntity()
.setTableName("Pure")
.setKeyType(Integer.class.getName())
.setValueType(String.class.getName())
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.setKeyFieldName("id")
.setValueFieldName("name");

var entity1 = new QueryEntity()
.setTableName("CITY")
.setKeyType(Integer.class.getName())
.setValueType(City.class.getName())
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.setKeyFieldName("id");

var entity2 = new QueryEntity()
.setTableName("PERSON")
.setKeyType(Integer.class.getName())
.setValueType(Person.class.getName())
.addQueryField("id", Integer.class.getName(), null)
.addQueryField("name", String.class.getName(), null)
.addQueryField("city_id", Integer.class.getName(), null)
.setKeyFieldName("id");

var cacheCfg = new CacheConfiguration<Integer, Object>(DEFAULT_CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL)
.setSqlSchema("PUBLIC")
.setInterceptor(new TestCacheInterceptor(keepBinary))
.setQueryEntities(List.of(entity1, entity2));

var pureCacheCfg = new CacheConfiguration<Integer, Object>("Pure")
.setAtomicityMode(TRANSACTIONAL)
.setSqlSchema("PUBLIC")
.setInterceptor(new TestAlwaysUnwrappedValCacheInterceptor())
.setQueryEntities(List.of(entity0));

var calciteQryEngineCfg = new CalciteQueryEngineConfiguration().setDefault(true);

return super.getConfiguration(igniteInstanceName)
.setSqlConfiguration(new SqlConfiguration().setQueryEnginesConfiguration(calciteQryEngineCfg))
.setTransactionConfiguration(new TransactionConfiguration().setTxAwareQueriesEnabled(true))
.setCacheConfiguration(cacheCfg, pureCacheCfg);
}

/** Test object unwrapped on interceptor side if applicable. */
@Test
public void testInterceptorUnwrapValIfNeeded() throws Exception {

Check failure on line 116 in modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheWithInterceptorIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add at least one assertion to this test case.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0-YHewgwD9Sv-kIQM1&open=AZ0-YHewgwD9Sv-kIQM1&pullRequest=12911
startGrid(0);
Ignite client = startClientGrid("client");

IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME);

if (keepBinary)
cache = cache.withKeepBinary();

int incParam = 0;

try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
cache.query(new SqlFieldsQuery("insert into PUBLIC.PURE(id, name) values (?, 'val')").setArgs(incParam++)).getAll();
cache.query(new SqlFieldsQuery("insert into PUBLIC.CITY(id, name) values (?, 'val')").setArgs(incParam++)).getAll();
cache.query(new SqlFieldsQuery("insert into PUBLIC.PERSON(id, name, city_id) values (?, 'val', 1)").setArgs(incParam++))
.getAll();

tx.commit();
}

cache.query(new SqlFieldsQuery("insert into PUBLIC.PURE(id, name) values (?, 'val')").setArgs(incParam++)).getAll();
cache.query(new SqlFieldsQuery("insert into PUBLIC.CITY(id, name) values (?, 'val')").setArgs(incParam++)).getAll();
cache.query(new SqlFieldsQuery("insert into PUBLIC.PERSON(id, name, city_id) values (?, 'val', 1)").setArgs(incParam)).getAll();
}

/** */
private static class City {
/** */
@GridToStringInclude
int id;

/** */
@GridToStringInclude
String name;

/** */
City(int id, String name) {
this.id = id;
this.name = name;
}
}

/** */
private static class Person {
/** */
@GridToStringInclude
int id;

/** */
@GridToStringInclude
String name;

/** */
@GridToStringInclude
int city_id;

Check warning on line 170 in modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheWithInterceptorIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this field "city_id" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0-YHewgwD9Sv-kIQM3&open=AZ0-YHewgwD9Sv-kIQM3&pullRequest=12911

/** */
Person(int id, String name, int city_id) {

Check warning on line 173 in modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/CacheWithInterceptorIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this local variable to match the regular expression '^[a-z][a-zA-Z0-9]*$'.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ0-YHewgwD9Sv-kIQM2&open=AZ0-YHewgwD9Sv-kIQM2&pullRequest=12911
this.id = id;
this.name = name;
this.city_id = city_id;
}
}

/** */
private static class TestAlwaysUnwrappedValCacheInterceptor extends CacheInterceptorAdapter<Integer, Object> {
/** {@inheritDoc} */
@Override public @Nullable Object onBeforePut(Cache.Entry<Integer, Object> entry, Object newVal) {
assertFalse(newVal instanceof BinaryObject);

return newVal;
}
}

/** */
private static class TestCacheInterceptor extends CacheInterceptorAdapter<Integer, Object> {
/** */
private final boolean keepBinary;

/**
* @param keepBinary Keep binary defines flag.
*/
TestCacheInterceptor(boolean keepBinary) {
this.keepBinary = keepBinary;
}

/** {@inheritDoc} */
@Override public @Nullable Object onBeforePut(Cache.Entry<Integer, Object> entry, Object newVal) {
if (keepBinary)
assertTrue(newVal instanceof BinaryObject);
else
assertFalse(newVal instanceof BinaryObject);

return newVal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ public static Collection<?> parameters() {
exceptionRaised.set(0);
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids(true);
}

/** */
private static class FilterDefinedNode implements IgnitePredicate<ClusterNode> {
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest;
import org.apache.ignite.internal.processors.query.calcite.integration.CacheWithInterceptorIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.CalciteBasicSecondaryIndexIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.CalciteErrorHandlilngIntegrationTest;
import org.apache.ignite.internal.processors.query.calcite.integration.CalcitePlanningDumpTest;
Expand Down Expand Up @@ -174,7 +175,8 @@
QueryEntityValueColumnAliasTest.class,
CacheStoreTest.class,
MultiDcQueryMappingTest.class,
TxWithExceptionalInterceptorTest.class
TxWithExceptionalInterceptorTest.class,
CacheWithInterceptorIntegrationTest.class
})
public class IntegrationTestSuite {
}
Original file line number Diff line number Diff line change
Expand Up @@ -3062,7 +3062,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED)
evt.message("Client node reconnected");

else
assert false : "Unexpected discovery message type: " + type;;
assert false : "Unexpected discovery message type: " + type;

ctx.event().record(evt, discoCache);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.cache.expiry.ExpiryPolicy;
Expand Down Expand Up @@ -181,6 +182,24 @@ public boolean skipReadThrough() {
return skipReadThrough;
}

/**
* See {@link IgniteInternalCache#withApplicationAttributes(Map)}.
*
* @return New instance of CacheOperationContext with new application attributes.
*/
public CacheOperationContext withApplicationAttributes(Map<String, String> attrs) {
return new CacheOperationContext(
skipStore,
skipReadThrough,
keepBinary,
expiryPlc,
noRetries,
dataCenterId,
recovery,
readRepairStrategy,
Collections.unmodifiableMap(attrs));
}

/**
* See {@link IgniteInternalCache#withSkipReadThrough()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,28 @@ public void active(boolean active) {
return new GridCacheProxyImpl<>(this.ctx, this, opCtx);
}

/** @return New internal cache instance based on this one, but with application attributes. */
@Override public GridCacheProxyImpl<K, V> withApplicationAttributes(Map<String, String> attrs) {
CacheOperationContext opCtx = ctx.operationContextPerCall();

if (opCtx == null) {
opCtx = new CacheOperationContext(
false,
false,
false,
null,
false,
null,
false,
null,
new HashMap<>(attrs));
}
else
opCtx = opCtx.withApplicationAttributes(attrs);

return new GridCacheProxyImpl<>(ctx, this, opCtx);
}

/** {@inheritDoc} */
@Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() {
CacheOperationContext opCtx = new CacheOperationContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ public IgniteInternalCache<K, V> delegate() {
}

/** @return New internal cache instance based on this one, but with application attributes. */
public GridCacheProxyImpl<K, V> withApplicationAttributes(Map<String, String> attrs) {
@Override public GridCacheProxyImpl<K, V> withApplicationAttributes(Map<String, String> attrs) {
CacheOperationContext prev = gate.enter(opCtx);

try {
return new GridCacheProxyImpl<>(ctx, delegate,
opCtx != null ? opCtx.setApplicationAttributes(attrs) :
new CacheOperationContext(
false,
true,
false,
false,
null,
false,
Expand Down
Loading
Loading