Skip to content

Commit e1d7d49

Browse files
committed
IGNITE-27678 Same partitions on different nodes can hold different updates if writeThrough is enabled
1 parent 257d6b5 commit e1d7d49

4 files changed

Lines changed: 283 additions & 8 deletions

File tree

modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.util.GridCommandHandlerPropertiesTest;
3535
import org.apache.ignite.util.GridCommandHandlerScheduleIndexRebuildTest;
3636
import org.apache.ignite.util.GridCommandHandlerTracingConfigurationTest;
37+
import org.apache.ignite.util.IdleVerifyCheckWithWriteThroughTest;
3738
import org.apache.ignite.util.IdleVerifyDumpTest;
3839
import org.apache.ignite.util.MetricCommandTest;
3940
import org.apache.ignite.util.PerformanceStatisticsCommandTest;
@@ -77,7 +78,8 @@
7778

7879
SecurityCommandHandlerPermissionsTest.class,
7980

80-
IdleVerifyDumpTest.class
81+
IdleVerifyDumpTest.class,
82+
IdleVerifyCheckWithWriteThroughTest.class
8183
})
8284
public class IgniteControlUtilityTestSuite2 {
8385
}
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.util;
19+
20+
import java.util.Collection;
21+
import java.util.List;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.regex.Pattern;
26+
import javax.cache.configuration.Factory;
27+
import javax.cache.integration.CacheWriterException;
28+
import org.apache.ignite.Ignite;
29+
import org.apache.ignite.IgniteCache;
30+
import org.apache.ignite.IgniteException;
31+
import org.apache.ignite.cache.CacheAtomicityMode;
32+
import org.apache.ignite.cache.CacheMode;
33+
import org.apache.ignite.cache.store.CacheStore;
34+
import org.apache.ignite.cache.store.CacheStoreSession;
35+
import org.apache.ignite.cache.store.CacheStoreSessionListener;
36+
import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener;
37+
import org.apache.ignite.cluster.ClusterState;
38+
import org.apache.ignite.configuration.CacheConfiguration;
39+
import org.apache.ignite.configuration.IgniteConfiguration;
40+
import org.apache.ignite.failure.AbstractFailureHandler;
41+
import org.apache.ignite.failure.FailureContext;
42+
import org.apache.ignite.internal.IgniteEx;
43+
import org.apache.ignite.internal.IgniteInternalFuture;
44+
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
45+
import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
46+
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
47+
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
48+
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
49+
import org.apache.ignite.testframework.GridTestUtils;
50+
import org.apache.ignite.transactions.Transaction;
51+
import org.junit.Test;
52+
import org.junit.runners.Parameterized;
53+
54+
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
55+
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
56+
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
57+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
58+
import static org.hamcrest.CoreMatchers.containsString;
59+
import static org.hamcrest.CoreMatchers.is;
60+
import static org.junit.Assert.assertThat;
61+
62+
/** */
63+
public class IdleVerifyCheckWithWriteThroughTest extends GridCommandHandlerClusterPerMethodAbstractTest {
64+
/** */
65+
private AtomicReference<Throwable> err;
66+
67+
/** Node kill trigger. */
68+
private static CountDownLatch nodeKill;
69+
70+
/** Tx message flag. */
71+
private static volatile boolean finalTxMsgPassed;
72+
73+
/** Session method flag. */
74+
private static AtomicBoolean sessionTriggered = new AtomicBoolean();
75+
76+
/** Storage exception message. */
77+
private static final String storageExceptionMessage = "Internal storage exception raised";
78+
79+
/** */
80+
@Parameterized.Parameter(1)
81+
public Boolean withPersistence;
82+
83+
/** */
84+
@Parameterized.Parameters(name = "cmdHnd={0}, withPersistence={1}")
85+
public static Collection<Object[]> parameters() {
86+
return List.of(
87+
new Object[] {CLI_CMD_HND, false},
88+
new Object[] {CLI_CMD_HND, true}
89+
);
90+
}
91+
92+
/** {@inheritDoc} */
93+
@Override protected void beforeTest() throws Exception {
94+
super.beforeTest();
95+
96+
stopAllGrids();
97+
98+
persistenceEnable(withPersistence);
99+
100+
if (withPersistence)
101+
cleanPersistenceDir();
102+
103+
err = new AtomicReference<>();
104+
105+
nodeKill = new CountDownLatch(1);
106+
sessionTriggered = new AtomicBoolean();
107+
finalTxMsgPassed = false;
108+
}
109+
110+
/** {@inheritDoc} */
111+
@Override protected boolean persistenceEnable() {
112+
return false;
113+
}
114+
115+
/** {@inheritDoc} */
116+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
117+
return super.getConfiguration(igniteInstanceName)
118+
.setCommunicationSpi(new TestRecordingCommunicationSpi())
119+
.setFailureHandler(new AbstractFailureHandler() {
120+
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
121+
err.compareAndSet(null, failureCtx.error());
122+
123+
return false;
124+
}
125+
});
126+
}
127+
128+
/** Test scenario:
129+
* <ul>
130+
* <li>Start 3 node [node0, node1, node2].</li>
131+
* <li>Initialize put operation into transactional cache where [node1] holds primary partition for such insertion.</li>
132+
* <li>Kill [node1] right after tx PREPARE stage is completed (it triggers tx recovery procedure.</li>
133+
* </ul>
134+
*
135+
* @see IgniteTxManager#salvageTx(IgniteInternalTx)
136+
*/
137+
@Test
138+
public void testTxCoordinatorLeftClusterWithEnabledReadWriteThrough() throws Exception {
139+
// sequential start is important here
140+
startGrid(0);
141+
startGrid(1);
142+
startGrid(2);
143+
144+
injectTestSystemOut();
145+
146+
int gridToStop = 1;
147+
148+
IgniteEx instanceToStop = grid(gridToStop);
149+
instanceToStop.cluster().state(ClusterState.ACTIVE);
150+
151+
TestRecordingCommunicationSpi commSpi =
152+
(TestRecordingCommunicationSpi)instanceToStop.configuration().getCommunicationSpi();
153+
commSpi.record(GridDhtTxFinishRequest.class);
154+
155+
commSpi.blockMessages((node, msg) -> {
156+
boolean ret = msg instanceof GridDhtTxFinishRequest;
157+
158+
if (ret) {
159+
nodeKill.countDown();
160+
finalTxMsgPassed = true;
161+
}
162+
163+
return ret;
164+
});
165+
166+
MapCacheStoreStrategy strategy = new MapCacheStoreStrategy();
167+
strategy.resetStore();
168+
Factory<? extends CacheStore<Object, Object>> storeFactory = strategy.getStoreFactory();
169+
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>("cache");
170+
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
171+
ccfg.setCacheMode(CacheMode.REPLICATED);
172+
ccfg.setReadThrough(true);
173+
ccfg.setWriteThrough(true);
174+
ccfg.setCacheStoreFactory(storeFactory);
175+
ccfg.setCacheStoreSessionListenerFactories(new TestCacheStoreFactory());
176+
177+
IgniteCache<Integer, Object> cache = instanceToStop.createCache(ccfg);
178+
179+
awaitPartitionMapExchange();
180+
181+
IgniteInternalFuture<Object> stopFut = GridTestUtils.runAsync(() -> {
182+
nodeKill.await();
183+
stopGrid(gridToStop);
184+
});
185+
186+
// primary key for [node1]
187+
Integer primaryKey = primaryKey(cache);
188+
189+
//noinspection EmptyCatchBlock
190+
try (Transaction tx = instanceToStop.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
191+
cache.put(primaryKey, new Object());
192+
tx.commit();
193+
}
194+
catch (Throwable th) {
195+
// No op
196+
}
197+
198+
stopFut.get(getTestTimeout());
199+
200+
assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify"));
201+
202+
String out = testOut.toString();
203+
204+
assertContains(log, out, "The check procedure has failed");
205+
// Update counters are equal but size is different
206+
if (withPersistence) {
207+
assertContains(log, out, "updateCntr=[lwm=0, missed=[], hwm=0], partitionState=OWNING, size=0");
208+
assertContains(log, out, "updateCntr=[lwm=1, missed=[], hwm=1], partitionState=OWNING, size=1");
209+
}
210+
else {
211+
assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=0");
212+
assertContains(log, out, "updateCntr=1, partitionState=OWNING, size=1");
213+
}
214+
testOut.reset();
215+
216+
assertNotNull(err.get());
217+
assertThat(err.get().getMessage(), is(containsString(storageExceptionMessage)));
218+
219+
if (withPersistence) {
220+
stopAllGrids();
221+
startGridsMultiThreaded(3);
222+
223+
awaitPartitionMapExchange(true, true, null);
224+
225+
assertEquals(EXIT_CODE_OK, execute("--port", connectorPort(grid(2)), "--cache", "idle_verify"));
226+
out = testOut.toString();
227+
// partVerHash are different, thus only regex check here
228+
Pattern primaryPattern = Pattern.compile("Partition instances: " +
229+
"\\[PartitionHashRecord" +
230+
".*?hwm=1\\], partitionState=OWNING, size=1" +
231+
".*?hwm=1\\], partitionState=OWNING, size=1" +
232+
".*?hwm=1\\], partitionState=OWNING, size=1");
233+
234+
boolean matches = primaryPattern.matcher(out).find();
235+
assertTrue(matches);
236+
}
237+
}
238+
239+
/** */
240+
private static class TestCacheStoreFactory implements Factory<CacheStoreSessionListener> {
241+
/** {@inheritDoc} */
242+
@Override public CacheStoreSessionListener create() {
243+
return new TestCacheJdbcStoreSessionListener();
244+
}
245+
}
246+
247+
/** */
248+
private static class TestCacheJdbcStoreSessionListener extends CacheJdbcStoreSessionListener {
249+
/** {@inheritDoc} */
250+
@Override public void start() throws IgniteException {
251+
// No op.
252+
}
253+
254+
/** {@inheritDoc} */
255+
@Override public void onSessionStart(CacheStoreSession ses) {
256+
// No op, originally connection need to be initialized here.
257+
}
258+
259+
/** {@inheritDoc} */
260+
@Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
261+
if (finalTxMsgPassed && sessionTriggered.compareAndSet(false, true))
262+
throw new CacheWriterException(storageExceptionMessage);
263+
}
264+
}
265+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.ignite.internal.processors.cache.distributed.dht;
1919

20-
import java.util.ArrayList;
2120
import java.util.Collection;
2221
import java.util.Collections;
22+
import java.util.List;
2323
import java.util.Map;
2424
import java.util.UUID;
2525
import javax.cache.processor.EntryProcessor;
@@ -226,12 +226,7 @@ public GridDhtTxRemote(
226226

227227
/** {@inheritDoc} */
228228
@Override public Collection<UUID> masterNodeIds() {
229-
Collection<UUID> res = new ArrayList<>(2);
230-
231-
res.add(nearNodeId);
232-
res.add(nodeId);
233-
234-
return res;
229+
return nearNodeId != nodeId ? List.of(nearNodeId, nodeId) : List.of(nearNodeId);
235230
}
236231

237232
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.NoSuchElementException;
2929
import java.util.Set;
3030
import java.util.UUID;
31+
import java.util.function.Consumer;
3132
import javax.cache.Cache;
3233
import javax.cache.integration.CacheLoaderException;
3334
import javax.cache.integration.CacheWriterException;
@@ -38,6 +39,8 @@
3839
import org.apache.ignite.cache.store.CacheStoreSessionListener;
3940
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore;
4041
import org.apache.ignite.configuration.CacheConfiguration;
42+
import org.apache.ignite.failure.FailureContext;
43+
import org.apache.ignite.failure.FailureType;
4144
import org.apache.ignite.internal.GridKernalContext;
4245
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
4346
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -117,12 +120,17 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
117120
/** Always keep binary. */
118121
protected boolean alwaysKeepBinary;
119122

123+
/** Failure handler reaction. */
124+
private Consumer<Throwable> failureHandlerAction;
125+
120126
/** {@inheritDoc} */
121127
@SuppressWarnings("unchecked")
122128
@Override public void initialize(@Nullable CacheStore cfgStore, Map sesHolders) throws IgniteCheckedException {
123129
GridKernalContext ctx = igniteContext();
124130
CacheConfiguration cfg = cacheConfiguration();
125131

132+
failureHandlerAction = e -> ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
133+
126134
writeThrough = cfg.isWriteThrough();
127135

128136
readThrough = cfg.isReadThrough();
@@ -934,6 +942,11 @@ private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws
934942
store.sessionEnd(!threwEx);
935943
}
936944
}
945+
catch (RuntimeException e) {
946+
failureHandlerAction.accept(e);
947+
948+
throw e;
949+
}
937950
catch (Exception e) {
938951
if (!threwEx)
939952
throw U.cast(e);

0 commit comments

Comments
 (0)