From 5669dc17cfbcf1c1e08ff91f605f6093f228cf49 Mon Sep 17 00:00:00 2001 From: Runtian Liu Date: Fri, 7 Nov 2025 13:15:42 -0800 Subject: [PATCH] Add feature to ignore pending node from node replacement during write consistency level calculation --- conf/cassandra.yaml | 7 + .../org/apache/cassandra/config/Config.java | 6 + .../cassandra/config/DatabaseDescriptor.java | 7 + .../apache/cassandra/db/ConsistencyLevel.java | 33 +- .../cassandra/locator/TokenMetadata.java | 1 - .../service/AbstractWriteResponseHandler.java | 21 +- .../DatacenterSyncWriteResponseHandler.java | 12 + .../DatacenterWriteResponseHandler.java | 8 +- .../service/WriteResponseHandler.java | 9 +- .../test/ReplacementPendingWritesTest.java | 426 ++++++++++++++++++ 10 files changed, 523 insertions(+), 7 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ReplacementPendingWritesTest.java diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 043ee9557b41..b97b3e99bbb5 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1779,6 +1779,13 @@ drop_compact_storage_enabled: false # group_by_enabled: true # Guardrail to allow/disallow TRUNCATE and DROP TABLE statements # drop_truncate_table_enabled: true +# +# During node replacement, exclude the replacement pending replica from write blockFor calculation +# Default: false (keeps current behavior where all pending replicas are included in blockFor) +# When enabled: only consistency level of natural replicas is required for writes during node replacement +# For normal bootstrap (non-replacement), pending replicas are still included +# write_requests_not_wait_on_pending_replacements: false +# # Guardrail to warn or fail when using a page size greater than threshold. # The two thresholds default to -1 to disable. # page_size_warn_threshold: -1 diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 21ca1b595cd1..86429687ed7f 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -845,6 +845,12 @@ public static void setClientMode(boolean clientMode) public volatile Set read_consistency_levels_disallowed = Collections.emptySet(); public volatile Set write_consistency_levels_warned = Collections.emptySet(); public volatile Set write_consistency_levels_disallowed = Collections.emptySet(); + // During node replacement, exclude the replacement pending replica from write blockFor calculation + // Default: false (keeps current behavior where all pending replicas are included in blockFor) + // When enabled: only consistency level of natural replicas is required for writes during node replacement + // For normal bootstrap (non-replacement), pending replicas are still included + public volatile boolean write_requests_not_wait_on_pending_replacements = false; + public volatile boolean user_timestamps_enabled = true; public volatile boolean group_by_enabled = true; public volatile boolean drop_truncate_table_enabled = true; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index f102c3f7293f..1da181275f32 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2137,6 +2137,13 @@ public static void setCompactionTombstoneWarningThreshold(int count) conf.compaction_tombstone_warning_threshold = count; } + public static boolean isExcludeReplacementPendingForWrite() { return conf.write_requests_not_wait_on_pending_replacements; } + + public static void setExcludeReplacementPendingForWrite(boolean value) + { + conf.write_requests_not_wait_on_pending_replacements = value; + } + public static int getConcurrentValidations() { return conf.concurrent_validations; diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index 843ccb92490e..828b7dd8b45d 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -28,8 +28,12 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.ProtocolException; +import java.util.Optional; + import static org.apache.cassandra.locator.Replicas.addToCountPerDc; public enum ConsistencyLevel @@ -174,23 +178,48 @@ public int blockForWrite(AbstractReplicationStrategy replicationStrategy, Endpoi assert pending != null; int blockFor = blockFor(replicationStrategy); + + // Filter out replacement pending replicas if feature is enabled + Endpoints pendingToAdd = pending; + if (DatabaseDescriptor.isExcludeReplacementPendingForWrite()) + { + TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata(); + if (tokenMetadata != null) + { + pendingToAdd = filterOutReplacementPendingReplicas(pending, tokenMetadata); + } + } + switch (this) { case ANY: break; case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL: // we will only count local replicas towards our response count, as these queries only care about local guarantees - blockFor += pending.count(InOurDc.replicas()); + blockFor += pendingToAdd.count(InOurDc.replicas()); break; case ONE: case TWO: case THREE: case QUORUM: case EACH_QUORUM: case SERIAL: case ALL: - blockFor += pending.size(); + blockFor += pendingToAdd.size(); } return blockFor; } + /** + * Filters out replacement pending replicas from the given endpoints collection. + * A pending replica is considered a replacement if it's replacing an existing node. + * + * @param pending the endpoints to filter + * @param tokenMetadata the token metadata containing replacement node information + * @return filtered endpoints excluding replacement pending replicas + */ + private static Endpoints filterOutReplacementPendingReplicas(Endpoints pending, TokenMetadata tokenMetadata) + { + return pending.filter(replica -> tokenMetadata.getReplacingNode(replica.endpoint()).isEmpty()); + } + /** * Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace * WARNING: this is not locality aware; you cannot safely use this with mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM) diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 112a8c58b5b6..68b5f0524496 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -445,7 +445,6 @@ public Optional getReplacingNode(InetAddressAndPort endpoint lock.readLock().unlock(); } } - public void removeBootstrapTokens(Collection tokens) { assert tokens != null && !tokens.isEmpty(); diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 4bb122304da6..52bebc72b47b 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -26,12 +26,14 @@ import javax.annotation.Nullable; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.concurrent.Condition; import org.slf4j.Logger; @@ -247,7 +249,24 @@ public ConsistencyLevel consistencyLevel() */ protected boolean waitingFor(InetAddressAndPort from) { - return true; + return !shouldSkipWaiting(from); + } + + protected boolean shouldSkipWaiting(InetAddressAndPort from) + { + // If feature is enabled, don't wait for replacement pending replicas + if (DatabaseDescriptor.isExcludeReplacementPendingForWrite()) + { + TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata(); + + if (tokenMetadata != null && (tokenMetadata.getReplacingNode(from).isPresent() || tokenMetadata.getReplacementNode(from).isPresent())) + { + // This is a replacement node or a node being replaced, don't count its response + return true; + } + } + return false; + } /** diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index e4b208b582fb..14a6e4a88e14 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -28,6 +28,7 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.Message; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; @@ -69,8 +70,12 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) + // However, if feature is enabled, exclude replacement pending replicas for (Replica pending : replicaPlan.pending()) { + // Skip replacement pending replicas if feature is enabled + if (shouldSkipWaiting(pending.endpoint())) + continue; responses.get(snitch.getDatacenter(pending)).incrementAndGet(); } } @@ -79,6 +84,13 @@ public void onResponse(Message message) { try { + // Only process if we're waiting for this response + if (message != null && !waitingFor(message.from())) + { + logResponseToIdealCLDelegate(message); + return; + } + String dataCenter = message == null ? DatabaseDescriptor.getLocalDataCenter() : snitch.getDatacenter(message.from()); diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index f33d6607e1c2..10d790169f30 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -17,11 +17,13 @@ */ package org.apache.cassandra.service; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.locator.InOurDc; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.Message; import org.apache.cassandra.transport.Dispatcher; @@ -63,6 +65,10 @@ public void onResponse(Message message) @Override protected boolean waitingFor(InetAddressAndPort from) { - return waitingFor.test(from); + // First check if it's in our local DC + if (!waitingFor.test(from)) + return false; + + return super.waitingFor(from); } } diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index ec18238f9932..6fa259834c5c 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -57,8 +57,13 @@ public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeTyp public void onResponse(Message m) { - if (responsesUpdater.decrementAndGet(this) == 0) - signal(); + // Only decrement if we're waiting for this response + // if m is null, it means the response is from local + if (m == null || waitingFor(m.from())) + { + if (responsesUpdater.decrementAndGet(this) == 0) + signal(); + } //Must be last after all subclass processing //The two current subclasses both assume logResponseToIdealCLDelegate is called //here. diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReplacementPendingWritesTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReplacementPendingWritesTest.java new file mode 100644 index 000000000000..7e927ba9938a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ReplacementPendingWritesTest.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Assert; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.StorageService; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK; +import static org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs; +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy; +import static org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.junit.Assert.fail; + +/** + * Advanced test for write_requests_not_wait_on_pending_replacements feature using ByteBuddy. + * + * This test demonstrates the performance improvement by: + * 1. Creating a 3-node cluster with RF=3 + * 2. Shutting down one node + * 3. Starting a replacement node that is slow to respond + * 4. Showing that writes timeout without the feature + * 5. Showing that writes succeed with the feature enabled + */ +public class ReplacementPendingWritesTest extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(ReplacementPendingWritesTest.class); + private static final String KEYSPACE_NETWORKTOPOLOTY = "replacement_pending_writes_networktopology"; + private static final String KEYSPACE_SIMPLE = "replacement_pending_writes_simple"; + + /** + * Execute a write operation with given consistency to given keyspace and expect it to succeed + */ + private void executeSuccessfulWrite(IInvokableInstance node, int pk, int ck, int v, ConsistencyLevel cl, String keyspaceName) throws Exception + { + node.coordinator().execute("INSERT INTO " + keyspaceName + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + cl, pk, ck, v); + } + + /** + * Execute a write operation with given consistency for given keyspace and expect it to timeout + */ + private void executeWriteExpectingException(IInvokableInstance node, int pk, int ck, int v, ConsistencyLevel cl, String keyspaceName, String expectedMessage) throws Exception + { + try + { + node.coordinator().execute("INSERT INTO " + keyspaceName + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + cl, pk, ck, v); + fail("Should have thrown a write timeout exception"); + } + catch (Exception e) + { + if (e.getClass().getName().equals(WriteTimeoutException.class.getName())) + { + Assert.assertEquals(expectedMessage, e.getMessage()); + } + else if (e.getClass().getName().equals(UnavailableException.class.getName())) + { + Assert.assertEquals("Cannot achieve consistency level " + cl, e.getMessage()); + } + else + { + throw new AssertionError(e); + } + } + } + + /** + * Test that demonstrates the feature prevents write timeouts in certain cases + */ + @Test + public void testWriteBehaviorWhenReplacementPendingNodeIsBusy() throws Exception + { + int numStartNodes = 3; + TokenSupplier even = TokenSupplier.evenlyDistributedTokens(numStartNodes); + try (Cluster cluster = Cluster.build(numStartNodes) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK)) + .withDynamicPortAllocation(false) + .withRacks(1, 3, numStartNodes) + .withInstanceInitializer(ReplacementPendingWritesTest.BB::install) + .withTokenSupplier(node -> even.token(node == (numStartNodes + 1) ? 2 : node)) + .start()) + { + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance nodeToRemove = cluster.get(2); + IInvokableInstance node3 = cluster.get(3); + fixDistributedSchemas(cluster); + init(cluster); + + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE_SIMPLE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};" ); + cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE_SIMPLE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE_NETWORKTOPOLOTY + " WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': 3};" ); + cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE_NETWORKTOPOLOTY + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + stopUnchecked(nodeToRemove); + + // now create a new node to replace the other node + IInvokableInstance replacingNode = replaceHostAndStart(cluster, nodeToRemove, props -> { + // since we have a downed host there might be a schema version which is old show up but + // can't be fetched since the host is down... + props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true); + }); + + // enable the feature(write_requests_not_wait_on_pending_replacements) on node1 and disable the feature on node3 + node1.runOnInstance( + () -> DatabaseDescriptor.setExcludeReplacementPendingForWrite(true) + ); + node3.runOnInstance( + () -> DatabaseDescriptor.setExcludeReplacementPendingForWrite(false) + ); + + // disable messages to new node to simulate new node busy + IMessageFilters.Filter disableMutationToNewNode = cluster.filters().inbound().verbs(Verb.MUTATION_REQ.id).to(4).drop(); + + // Test the feature enabled with pending node busy. + List testDataForFeatureEnabledWhenPendingNodeBusy = getTestDataForFeatureEnabledWhenPendingNodeBusy(); + testHelperQueryExecutor(node1, testDataForFeatureEnabledWhenPendingNodeBusy); + + // Test the feature is disabled with pending node busy + List testDataForFeatureDisabledWhenPendingNodeBusy = getTestDataForFeatureDisabledWhenPendingNodeBusy(); + testHelperQueryExecutor(node3, testDataForFeatureDisabledWhenPendingNodeBusy); + // enable the new node to receive mutations, both nodes should be able to get successful writes + disableMutationToNewNode.off(); + List testDataForPendingNodeNotBusy = getTestDataForPendingNodeNotBusy(true); + // test feature enabled + testHelperQueryExecutor(node1, testDataForPendingNodeNotBusy); + // test feature disabled + testDataForPendingNodeNotBusy = getTestDataForPendingNodeNotBusy(false); + testHelperQueryExecutor(node3, testDataForPendingNodeNotBusy); + + // test if feature is enabled, one natural replica cannot receive mutation, we get write timeout + // because pending node response will not be counted + IMessageFilters.Filter disableMutationFrom1To3 = cluster.filters().inbound().verbs(Verb.MUTATION_REQ.id).from(1).to(3).drop(); + List testDataForFeatureEnabledWhenNormalNodeBusy = getTestDataForFeatureEnabledWhenNormalNodeBusy(); + testHelperQueryExecutor(node1, testDataForFeatureEnabledWhenNormalNodeBusy); + disableMutationFrom1To3.off(); + + // test if feature is disabled, one natural replica cannot receive mutation, but we are able to get response + // from the pending node + IMessageFilters.Filter disableMutationFrom3To1 = cluster.filters().inbound().verbs(Verb.MUTATION_REQ.id).from(3).to(1).drop(); + List testDataForFeatureDisabledWhenNormalNodeBusy = getTestDataForFeatureDisabledWhenNormalNodeBusy(); + testHelperQueryExecutor(node3, testDataForFeatureDisabledWhenNormalNodeBusy); + disableMutationFrom3To1.off(); + + // restore the replacement node to let it join + // unblock node 4 from joining + replacingNode.runOnInstance( + () -> { + Assert.assertTrue(StorageService.instance.isJoining()); + // disable block join and retry later + BB.keepNodeInPendingState.set(false); + } + ); + + // resume bootstrap to allow the new node join. + System.setProperty("cassandra.reset_bootstrap_progress", "false"); + System.setProperty("cassandra.replace_address_first_boot", nodeToRemove.config().broadcastAddress().getAddress().getHostAddress()); + replacingNode.nodetoolResult("bootstrap", "resume").asserts().success(); + + // wait till the replacing node is in the ring + awaitRingJoin(node1, replacingNode); + awaitRingJoin(node3, replacingNode); + awaitRingJoin(replacingNode, node1); + awaitRingJoin(replacingNode, node3); + + // make sure all nodes are healthy + awaitRingHealthy(node1); + + assertRingIs(node1, node1, replacingNode, node3); + } + } + + public static class BB + { + public static final AtomicBoolean keepNodeInPendingState = new AtomicBoolean(true); + + public static void install(ClassLoader cl, Integer i) + { + // only install for the new node + if (i != 4) + return; + new ByteBuddy().rebase(StorageService.class) + .method(named("bootstrapFinished")) + .intercept(MethodDelegation.to(ReplacementPendingWritesTest.BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static void bootstrapFinished(@SuperCall Callable zuper) throws Exception + { + while (keepNodeInPendingState.get()) + { + logger.info("Keep node in pending state by throwing error"); + throw new RuntimeException("Keep node in joining state"); + } + logger.info("Done keep node in pending state. Node will finish join soon."); + zuper.call(); + } + } + + private class TestData + { + String keyspaceName; + ConsistencyLevel consistencyLevel; + boolean exceptionExpected; + int recievedAcks; + + TestData(String keyspaceName, ConsistencyLevel consistencyLevel, boolean exceptionExpected, int recievedAcks) + { + this.keyspaceName = keyspaceName; + this.consistencyLevel = consistencyLevel; + this.exceptionExpected = exceptionExpected; + this.recievedAcks = recievedAcks; + } + } + + private List getTestDataForFeatureEnabledWhenPendingNodeBusy() + { + List result = new ArrayList<>(); + for (ConsistencyLevel cl : ConsistencyLevel.values()) + { + switch (cl) + { + case ANY: + case ONE: + case TWO: + case QUORUM: + case LOCAL_ONE: + result.add(new TestData(KEYSPACE_SIMPLE, cl, false, 0)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case LOCAL_QUORUM: + case EACH_QUORUM: + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case THREE: + case ALL: + result.add(new TestData(KEYSPACE_SIMPLE, cl, true, 2)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 2)); + break; + default: + break; + } + } + return result; + } + + private List getTestDataForFeatureDisabledWhenPendingNodeBusy() + { + List result = new ArrayList<>(); + for (ConsistencyLevel cl : ConsistencyLevel.values()) + { + switch (cl) + { + case ANY: + case ONE: + case LOCAL_ONE: + result.add(new TestData(KEYSPACE_SIMPLE, cl, false, 0)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case LOCAL_QUORUM: + case EACH_QUORUM: + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 2)); + break; + case THREE: + case ALL: + result.add(new TestData(KEYSPACE_SIMPLE, cl, true, 2)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 2)); + break; + default: + break; + } + } + return result; + } + + private List getTestDataForPendingNodeNotBusy(boolean featureEnabled) + { + List result = new ArrayList<>(); + for (ConsistencyLevel cl : ConsistencyLevel.values()) + { + switch (cl) + { + case ANY: + case ONE: + case LOCAL_ONE: + result.add(new TestData(KEYSPACE_SIMPLE, cl, false, 0)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case LOCAL_QUORUM: + case EACH_QUORUM: + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case THREE: + case ALL: + int recievedAcks = 3; + if (featureEnabled) + recievedAcks = 2; + result.add(new TestData(KEYSPACE_SIMPLE, cl, true, recievedAcks)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, recievedAcks)); + break; + default: + break; + } + } + return result; + } + + private List getTestDataForFeatureEnabledWhenNormalNodeBusy() + { + List result = new ArrayList<>(); + for (ConsistencyLevel cl : ConsistencyLevel.values()) + { + switch (cl) + { + case ANY: + case ONE: + case LOCAL_ONE: + result.add(new TestData(KEYSPACE_SIMPLE, cl, false, 0)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case LOCAL_QUORUM: + case EACH_QUORUM: + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 1)); + break; + case THREE: + case ALL: + result.add(new TestData(KEYSPACE_SIMPLE, cl, true, 1)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 1)); + break; + default: + break; + } + } + return result; + } + + private List getTestDataForFeatureDisabledWhenNormalNodeBusy() + { + List result = new ArrayList<>(); + for (ConsistencyLevel cl : ConsistencyLevel.values()) + { + switch (cl) + { + case ANY: + case ONE: + case LOCAL_ONE: + result.add(new TestData(KEYSPACE_SIMPLE, cl, false, 0)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, false, 0)); + break; + case LOCAL_QUORUM: + case EACH_QUORUM: + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 2)); + break; + case THREE: + case ALL: + result.add(new TestData(KEYSPACE_SIMPLE, cl, true, 2)); + result.add(new TestData(KEYSPACE_NETWORKTOPOLOTY, cl, true, 2)); + break; + default: + break; + } + } + return result; + } + + private void testHelperQueryExecutor(IInvokableInstance node, List testData) throws Exception + { + for (TestData singleTestData : testData) + { + if (singleTestData.exceptionExpected) + { + String expectedMessage = String.format("Operation timed out - received only %d responses.", singleTestData.recievedAcks); + executeWriteExpectingException(node, 2, 2, 2, singleTestData.consistencyLevel, singleTestData.keyspaceName, expectedMessage); + } + else + { + executeSuccessfulWrite(node, 1, 1, 1, singleTestData.consistencyLevel, singleTestData.keyspaceName); + } + } + } +} +