Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/java/org/apache/cassandra/db/CounterMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,21 @@ public ConsistencyLevel consistency()
* @return the applied resulting Mutation
*/
public Mutation applyCounterMutation() throws WriteTimeoutException
{
return applyCounterMutation(null);
}

/**
* Applies the counter mutation with an optional mutation ID for tracked keyspaces.
*
* For tracked keyspaces, the mutation ID is assigned to the concrete result BEFORE
* it is applied, ensuring the concrete counter values (not the operation) are
* journaled and tracked with the ID.
*
* @param mutationId the mutation ID to assign to the concrete result, or null for non-tracked
* @return the applied resulting Mutation (with ID if provided)
*/
public Mutation applyCounterMutation(MutationId mutationId) throws WriteTimeoutException
{
Mutation.PartitionUpdateCollector resultBuilder = new Mutation.PartitionUpdateCollector(id(), getKeyspaceName(), key());
Keyspace keyspace = Keyspace.open(getKeyspaceName());
Expand All @@ -173,6 +188,12 @@ public Mutation applyCounterMutation() throws WriteTimeoutException
resultBuilder.add(processModifications(upd));

Mutation result = resultBuilder.build();

// For tracked keyspaces, assign the mutation ID to the result before
// calling result.apply() since applyInternalTracked() requires an ID
if (mutationId != null && !mutationId.isNone())
result = result.withMutationId(mutationId);

result.apply();
return result;
}
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.replication.ForwardedWrite;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.transport.Dispatcher;

Expand All @@ -38,6 +39,15 @@ protected void applyMutation(final Message<CounterMutation> message, InetAddress
final CounterMutation cm = message.payload;
logger.trace("Applying forwarded {}", cm);

Keyspace keyspace = Keyspace.open(cm.getKeyspaceName());

if (keyspace.getMetadata().useMutationTracking())
{
logger.trace("Applying tracked forwarded counter mutation {}", cm);
ForwardedWrite.applyForwardedCounterMutation(cm, message, respondToAddress);
return;
}

String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter;
// We should not wait for the result of the write in this thread,
// otherwise we could have a distributed deadlock between replicas
Expand Down
114 changes: 114 additions & 0 deletions src/java/org/apache/cassandra/replication/ForwardedWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.TrackedWriteResponseHandler;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.transport.Dispatcher;
Expand Down Expand Up @@ -241,6 +242,119 @@ public static AbstractWriteResponseHandler<Object> forwardMutation(Mutation muta
return handler;
}

/**
* Forward a tracked counter mutation to a replica leader for processing.
* The leader will apply the counter mutation, assign a mutation ID, and replicate to other replicas.
*/
public static AbstractWriteResponseHandler<Object> forwardCounterMutation(CounterMutation counterMutation,
ReplicaPlan.ForWrite plan,
AbstractReplicationStrategy strategy,
Dispatcher.RequestTime requestTime)
{
Preconditions.checkArgument(counterMutation.id().isNone(), "CounterMutation should not have an ID when forwarding");

ClusterMetadata cm = ClusterMetadata.current();
String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter;

// Find the leader replica - prefer local DC replicas for counters
Replica leader;
try
{
leader = ReplicaPlans.findCounterLeaderReplica(cm, counterMutation.getKeyspaceName(),
counterMutation.key(), localDataCenter,
counterMutation.consistency());
}
catch (Exception e)
{
logger.error("Failed to find counter leader replica for tracked write", e);
throw e;
}

Preconditions.checkState(!leader.isSelf(), "Leader should not be self when forwarding counter mutation");
logger.trace("Forwarding tracked counter mutation to leader replica {}", leader);

// Create response handler for all replicas
AbstractWriteResponseHandler<Object> handler = strategy.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, requestTime);

// Add callbacks for all live replicas to respond directly to coordinator
Message<CounterMutation> forwardMessage = Message.outWithRequestTime(Verb.COUNTER_MUTATION_REQ, counterMutation, requestTime);

for (Replica replica : plan.contacts())
{
if (plan.isAlive(replica))
{
logger.trace("Adding forwarding callback for tracked counter response from {} id {}", replica, forwardMessage.id());
MessagingService.instance().callbacks.addWithExpiration(handler, forwardMessage, replica);
}
else
{
handler.expired();
}
}

// Send the counter mutation to the leader
MessagingService.instance().send(forwardMessage, leader.endpoint());

return handler;
}

/**
* Apply a forwarded tracked counter mutation on the leader replica.
* Called by CounterMutationVerbHandler when receiving a forwarded counter write.
*
* This method:
* 1. Creates CoordinatorAckInfo from the incoming message
* 2. Applies counter mutation locally with generated mutation ID
* 3. Forwards result (Mutation not CounterMutation) to other replicas with CoordinatorAckInfo
* 4. Sends leader's response back to coordinator
*
* @param counterMutation the counter mutation to apply
* @param message the original message (contains coordinator address and message ID)
* @param respondToAddress the address to send the response to (coordinator)
*/
public static void applyForwardedCounterMutation(CounterMutation counterMutation,
Message<CounterMutation> message,
InetAddressAndPort respondToAddress)
{
try
{
CoordinatorAckInfo coordinatorAckInfo = CoordinatorAckInfo.toCoordinator(message.from(), message.id());

String keyspaceName = counterMutation.getKeyspaceName();
Token token = counterMutation.key().getToken();
Keyspace ks = Keyspace.open(keyspaceName);
ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(ks, counterMutation.consistency(), token, ReplicaPlans.writeAll);
AbstractReplicationStrategy rs = plan.replicationStrategy();

MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);

if (logger.isTraceEnabled())
logger.trace("Forwarded counter mutation {}: applying locally with ID and forwarding to other replicas", id);

TrackedWriteResponseHandler handler =
TrackedWriteResponseHandler.wrap(
rs.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, Dispatcher.RequestTime.forImmediateExecution()),
id
);

// Apply counter mutation with ID to get result
Mutation result = counterMutation.applyCounterMutation(id);

// Send result to other replicas with CoordinatorAckInfo
// They will respond to the coordinator, not to this leader
TrackedWriteRequest.sendToReplicasOnly(result, plan, handler, coordinatorAckInfo);

// Send this leader's response back to coordinator
MessagingService.instance().send(message.emptyResponse(), respondToAddress);

logger.trace("Tracked counter mutation {} processed, response sent to {}", id, respondToAddress);
}
catch (Exception e)
{
logger.error("Error applying forwarded tracked counter mutation {}", counterMutation, e);
}
}

public static final IVersionedSerializer<MutationRequest> serializer = new IVersionedSerializer<>()
{
@Override
Expand Down
156 changes: 147 additions & 9 deletions src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class TrackedWriteRequest
* @param requestTime object holding times when request got enqueued and started execution
*/
public static AbstractWriteResponseHandler<?> perform(
Mutation mutation, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
IMutation mutation, ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
{
Tracing.trace("Determining replicas for mutation");

Expand All @@ -95,23 +96,45 @@ public static AbstractWriteResponseHandler<?> perform(
if (logger.isTraceEnabled())
logger.trace("Remote tracked request {} {}", mutation, plan);
writeMetrics.remoteRequests.mark();
return ForwardedWrite.forwardMutation(mutation, plan, rs, requestTime);

if (mutation instanceof CounterMutation)
return ForwardedWrite.forwardCounterMutation((CounterMutation) mutation, plan, rs, requestTime);
else
return ForwardedWrite.forwardMutation((Mutation) mutation, plan, rs, requestTime);
}

if (logger.isTraceEnabled())
logger.trace("Local tracked request {} {}", mutation, plan);
writeMetrics.localRequests.mark();

MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);

if (logger.isTraceEnabled())
logger.trace("Write replication plan for mutation {}: live={}, pending={}, all={}",
id, plan.live(), plan.pending(), plan.contacts());
if (mutation instanceof CounterMutation)
{
if (logger.isTraceEnabled())
logger.trace("Write replication plan for counter mutation {}: live={}, pending={}, all={}",
id, plan.live(), plan.pending(), plan.contacts());

TrackedWriteResponseHandler handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.COUNTER, null, requestTime), id);

Mutation result = ((CounterMutation) mutation).applyCounterMutation(id);
sendToReplicasOnly(result, plan, handler, null);
return handler;
}
else
{
mutation = mutation.withMutationId(id);

if (logger.isTraceEnabled())
logger.trace("Write replication plan for mutation {}: live={}, pending={}, all={}",
id, plan.live(), plan.pending(), plan.contacts());

TrackedWriteResponseHandler handler =
TrackedWriteResponseHandler handler =
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime), id);
applyLocallyAndSendToReplicas(mutation, plan, handler);
return handler;
applyLocallyAndSendToReplicas((Mutation) mutation, plan, handler);
return handler;
}
}

public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler)
Expand Down Expand Up @@ -221,6 +244,121 @@ public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan.
}
}

/**
* Send a mutation to remote replicas only, without applying it locally.
* This is used for counter mutations where the mutation has already been applied locally
* by applyCounterMutation() before assigning the mutation ID.
*
* @param mutation the mutation with assigned ID to send to replicas
* @param plan the replica plan
* @param handler the response handler
* @param coordinatorAckInfo optional coordinator info for forwarded writes (null for local coordinator)
*/
public static void sendToReplicasOnly(Mutation mutation, ReplicaPlan.ForWrite plan, TrackedWriteResponseHandler handler, ForwardedWrite.CoordinatorAckInfo coordinatorAckInfo)
{
String localDataCenter = DatabaseDescriptor.getLocator().local().datacenter;
List<Replica> localDCReplicas = null;
Map<String, List<Replica>> remoteDCReplicas = null;

// create a Message for non-local writes
Message<Mutation> message = null;

// Serialize this mutation now so when we send it to multiple replicas concurrently,
// they all use the cached serialized bytes instead of re-serializing it multiple times.
Mutation.serializer.prepareSerializedBuffer(mutation, MessagingService.current_version);

boolean foundSelf = false;
for (Replica destination : plan.contacts())
{
if (!plan.isAlive(destination))
{
if (logger.isTraceEnabled())
logger.trace("Skipping dead replica {} for mutation {}", destination, mutation.id());
handler.expired(); // immediately mark the response as expired since the request will not be sent
continue;
}

if (destination.isSelf())
{
foundSelf = true; // Mutation was already applied locally
continue;
}

if (message == null)
{
Message.Builder<Mutation> builder = Message.builder(MUTATION_REQ, mutation)
.withRequestTime(handler.getRequestTime())
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE);

// If this is a forwarded write, include coordinator ack info so replicas
// know to respond to the original coordinator, not this leader
if (coordinatorAckInfo != null)
builder.withParam(ParamType.COORDINATOR_ACK_INFO, coordinatorAckInfo);

message = builder.build();
}

String dc = DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter;

if (localDataCenter.equals(dc))
{
if (localDCReplicas == null)
localDCReplicas = new ArrayList<>(plan.contacts().size());
localDCReplicas.add(destination);
}
else
{
if (remoteDCReplicas == null)
remoteDCReplicas = new HashMap<>();

List<Replica> replicas = remoteDCReplicas.get(dc);
if (replicas == null)
replicas = remoteDCReplicas.computeIfAbsent(dc, ignore -> new ArrayList<>(3)); // most DCs will have <= 3 replicas
replicas.add(destination);
}
}

Preconditions.checkState(foundSelf, "Coordinator must be a replica for tracked counter mutations");

// Notify handler that local write succeeded (mutation was already applied before calling this method)
handler.onResponse(null);

IntHashSet remoteReplicas = null;
if (localDCReplicas != null || remoteDCReplicas != null)
remoteReplicas = new IntHashSet();

if (localDCReplicas != null)
{
for (Replica replica : localDCReplicas)
{
if (logger.isTraceEnabled())
logger.trace("Sending mutation {} to local replica {}", mutation.id(), replica);
MessagingService.instance().sendWriteWithCallback(message, replica, handler);
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
}
}

if (remoteDCReplicas != null)
{
// for each datacenter, send the message to one node to relay the write to other replicas
for (List<Replica> dcReplicas : remoteDCReplicas.values())
{
if (logger.isTraceEnabled())
logger.trace("Sending mutation {} to remote dc replicas {}", mutation.id(), dcReplicas);
sendMessagesToRemoteDC(message, EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, coordinatorAckInfo);
for (Replica replica : dcReplicas)
remoteReplicas.add(ClusterMetadata.current().directory.peerId(replica.endpoint()).id());
}
}

if (remoteReplicas != null)
{
if (logger.isTraceEnabled())
logger.trace("Sending mutation {} to remote replicas {}", mutation.id(), remoteReplicas);
MutationTrackingService.instance.sentWriteRequest(mutation, remoteReplicas);
}
}

static void applyMutationLocally(Mutation mutation, RequestCallback<NoPayload> handler)
{
Preconditions.checkArgument(handler instanceof TrackedWriteResponseHandler || handler instanceof ForwardedWrite.LeaderCallback);
Expand Down
Loading