Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,13 @@ drop_compact_storage_enabled: false
# group_by_enabled: true
# Guardrail to allow/disallow TRUNCATE and DROP TABLE statements
# drop_truncate_table_enabled: true
#
# During node replacement, exclude the replacement pending replica from write blockFor calculation
# Default: false (keeps current behavior where all pending replicas are included in blockFor)
# When enabled: only consistency level of natural replicas is required for writes during node replacement
# For normal bootstrap (non-replacement), pending replicas are still included
# write_requests_not_wait_on_pending_replacements: false
#
# Guardrail to warn or fail when using a page size greater than threshold.
# The two thresholds default to -1 to disable.
# page_size_warn_threshold: -1
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@ public static void setClientMode(boolean clientMode)
public volatile Set<ConsistencyLevel> read_consistency_levels_disallowed = Collections.emptySet();
public volatile Set<ConsistencyLevel> write_consistency_levels_warned = Collections.emptySet();
public volatile Set<ConsistencyLevel> write_consistency_levels_disallowed = Collections.emptySet();
// During node replacement, exclude the replacement pending replica from write blockFor calculation
// Default: false (keeps current behavior where all pending replicas are included in blockFor)
// When enabled: only consistency level of natural replicas is required for writes during node replacement
// For normal bootstrap (non-replacement), pending replicas are still included
public volatile boolean write_requests_not_wait_on_pending_replacements = false;

public volatile boolean user_timestamps_enabled = true;
public volatile boolean group_by_enabled = true;
public volatile boolean drop_truncate_table_enabled = true;
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,13 @@ public static void setCompactionTombstoneWarningThreshold(int count)
conf.compaction_tombstone_warning_threshold = count;
}

public static boolean isExcludeReplacementPendingForWrite() { return conf.write_requests_not_wait_on_pending_replacements; }

public static void setExcludeReplacementPendingForWrite(boolean value)
{
conf.write_requests_not_wait_on_pending_replacements = value;
}

public static int getConcurrentValidations()
{
return conf.concurrent_validations;
Expand Down
33 changes: 31 additions & 2 deletions src/java/org/apache/cassandra/db/ConsistencyLevel.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.ProtocolException;

import java.util.Optional;

import static org.apache.cassandra.locator.Replicas.addToCountPerDc;

public enum ConsistencyLevel
Expand Down Expand Up @@ -174,23 +178,48 @@ public int blockForWrite(AbstractReplicationStrategy replicationStrategy, Endpoi
assert pending != null;

int blockFor = blockFor(replicationStrategy);

// Filter out replacement pending replicas if feature is enabled
Endpoints<?> pendingToAdd = pending;
if (DatabaseDescriptor.isExcludeReplacementPendingForWrite())
{
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
if (tokenMetadata != null)
{
pendingToAdd = filterOutReplacementPendingReplicas(pending, tokenMetadata);
}
}

switch (this)
{
case ANY:
break;
case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL:
// we will only count local replicas towards our response count, as these queries only care about local guarantees
blockFor += pending.count(InOurDc.replicas());
blockFor += pendingToAdd.count(InOurDc.replicas());
break;
case ONE: case TWO: case THREE:
case QUORUM: case EACH_QUORUM:
case SERIAL:
case ALL:
blockFor += pending.size();
blockFor += pendingToAdd.size();
}
return blockFor;
}

/**
* Filters out replacement pending replicas from the given endpoints collection.
* A pending replica is considered a replacement if it's replacing an existing node.
*
* @param pending the endpoints to filter
* @param tokenMetadata the token metadata containing replacement node information
* @return filtered endpoints excluding replacement pending replicas
*/
private static Endpoints<?> filterOutReplacementPendingReplicas(Endpoints<?> pending, TokenMetadata tokenMetadata)
{
return pending.filter(replica -> tokenMetadata.getReplacingNode(replica.endpoint()).isEmpty());
}

/**
* Determine if this consistency level meets or exceeds the consistency requirements of the given cl for the given keyspace
* WARNING: this is not locality aware; you cannot safely use this with mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM)
Expand Down
1 change: 0 additions & 1 deletion src/java/org/apache/cassandra/locator/TokenMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ public Optional<InetAddressAndPort> getReplacingNode(InetAddressAndPort endpoint
lock.readLock().unlock();
}
}

public void removeBootstrapTokens(Collection<Token> tokens)
{
assert tokens != null && !tokens.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@

import javax.annotation.Nullable;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;

import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.Condition;
import org.slf4j.Logger;
Expand Down Expand Up @@ -247,7 +249,24 @@ public ConsistencyLevel consistencyLevel()
*/
protected boolean waitingFor(InetAddressAndPort from)
{
return true;
return !shouldSkipWaiting(from);
}

protected boolean shouldSkipWaiting(InetAddressAndPort from)
{
// If feature is enabled, don't wait for replacement pending replicas
if (DatabaseDescriptor.isExcludeReplacementPendingForWrite())
{
TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();

if (tokenMetadata != null && (tokenMetadata.getReplacingNode(from).isPresent() || tokenMetadata.getReplacementNode(from).isPresent()))
{
// This is a replacement node or a node being replaced, don't count its response
return true;
}
}
return false;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.WriteType;
Expand Down Expand Up @@ -69,8 +70,12 @@ public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,

// During bootstrap, we have to include the pending endpoints or we may fail the consistency level
// guarantees (see #833)
// However, if feature is enabled, exclude replacement pending replicas
for (Replica pending : replicaPlan.pending())
{
// Skip replacement pending replicas if feature is enabled
if (shouldSkipWaiting(pending.endpoint()))
continue;
responses.get(snitch.getDatacenter(pending)).incrementAndGet();
}
}
Expand All @@ -79,6 +84,13 @@ public void onResponse(Message<T> message)
{
try
{
// Only process if we're waiting for this response
if (message != null && !waitingFor(message.from()))
{
logResponseToIdealCLDelegate(message);
return;
}

String dataCenter = message == null
? DatabaseDescriptor.getLocalDataCenter()
: snitch.getDatacenter(message.from());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.apache.cassandra.service;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.InOurDc;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.transport.Dispatcher;

Expand Down Expand Up @@ -63,6 +65,10 @@ public void onResponse(Message<T> message)
@Override
protected boolean waitingFor(InetAddressAndPort from)
{
return waitingFor.test(from);
// First check if it's in our local DC
if (!waitingFor.test(from))
return false;

return super.waitingFor(from);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ public WriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, WriteType writeTyp

public void onResponse(Message<T> m)
{
if (responsesUpdater.decrementAndGet(this) == 0)
signal();
// Only decrement if we're waiting for this response
// if m is null, it means the response is from local
if (m == null || waitingFor(m.from()))
{
if (responsesUpdater.decrementAndGet(this) == 0)
signal();
}
//Must be last after all subclass processing
//The two current subclasses both assume logResponseToIdealCLDelegate is called
//here.
Expand Down
Loading