Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
27743d2
[CASSANDRA-20736] Add CMS membership directly to ClusterMetadata
beobal Jun 10, 2025
6f1b7de
[CASSANDRA-20736] Init singleton CMS cluster with restarts
beobal Jun 11, 2025
c7871bd
[CASSANDRA-20736] Update CMS reconfiguration
beobal Jun 12, 2025
8a68f97
[CASSANDRA-20736] Update Startup transformation
beobal Jun 12, 2025
307e878
[CASSANDRA-20736] Update legacy CMS membership transformations - TODO…
beobal Jun 12, 2025
5db6f87
[CASSANDRA-20736] New MetadataKey for CMS membership
beobal Jun 13, 2025
5a9ccbf
[CASSANDRA-20736] Support for upgrades from gossip & from earlier ver…
beobal Jun 13, 2025
4c012b1
[CASSANDRA-20736] Update CancelCMSReconfiguration
beobal Jun 13, 2025
c9faf3f
[CASSANDRA-20736] Properly set lastModified on DataPlacements
beobal Jun 16, 2025
12a2f55
[CASSANDRA-20736] AtomicLongProcessor can always accept commits
beobal Jun 16, 2025
36d62e7
[CASSANDRA-20736] Separate MetaStrategy placements from others
beobal Jun 18, 2025
cb9ebbb
[CASSANDRA-20736] Make DataPlacements private on ClusterMetadata
beobal Jun 19, 2025
aaa2049
[CASSANDRA-20736] Test fixes
beobal Jun 16, 2025
01b2350
[CASSANDRA-20736] Rework CMS initialization
beobal Jan 23, 2026
51e1c6c
[CASSANDRA-20476] Add dtest for CMS rediscovery
beobal May 29, 2025
d346661
[CASSANDRA-20476] Prep for CMSLookup
beobal Jun 23, 2025
bed6f56
[CASSANDRA-20476] Introduce CMSLookup
beobal Jun 23, 2025
7807fd5
[CASSANDRA-20476] Perform rediscovery of CMS at startup if addresses …
beobal Jun 2, 2025
ba2256c
[CASSANDRA-20476] Attempt to wait for all address changes to be enact…
beobal Jun 25, 2025
76e9f1d
[CASSANDRA-20476] Some minor logging additions
beobal Jun 24, 2025
214261f
[CASSANDRA-20476] Make sure to start messaging service
Aug 5, 2025
a52733a
[CASSANDRA-20476] Don't attempt to catch up from peers or CMS while l…
beobal Jan 22, 2026
dc2fab7
Make shadow gossip round parameters configurable for testing
beobal Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ public enum CassandraRelevantProperties
// transactional cluster metadata relevant properties
// TODO: not a fan of being forced to prefix these to satisfy the alphabetic ordering constraint
// but it makes sense to group logically related properties together
TCM_SHADOW_ROUND_MAX_ATTEMPTS("cassandra.shadow_round_max_attempts", "3"),
TCM_SHADOW_ROUND_TIMEOUT("cassandra.shadow_round_timeout_millis", "15000"),
/**
* for testing purposes disable the automatic CMS reconfiguration after a bootstrap/replace/move operation
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Keyspaces apply(ClusterMetadata metadata)
// as we have as keys in metadata.placements to have a fast map lookup
// ReplicationParams are immutable, so it is a safe optimization
KeyspaceParams keyspaceParams = attrs.asNewKeyspaceParams();
ReplicationParams replicationParams = metadata.placements.deduplicateReplicationParams(keyspaceParams.replication);
ReplicationParams replicationParams = metadata.placements().deduplicateReplicationParams(keyspaceParams.replication);
keyspaceParams = keyspaceParams.withSwapped(replicationParams);
KeyspaceMetadata keyspaceMetadata = KeyspaceMetadata.create(keyspaceName, keyspaceParams);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,6 @@ else if (message.epoch().isBefore(metadata.schema.lastModified()))

private static VersionedEndpoints.ForToken writePlacements(ClusterMetadata metadata, String keyspace, DecoratedKey key)
{
return metadata.placements.get(metadata.schema.getKeyspace(keyspace).getMetadata().params.replication).writes.forToken(key.getToken());
return metadata.placement(metadata.schema.getKeyspace(keyspace).getMetadata().params.replication).writes.forToken(key.getToken());
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/DiskBoundaryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, ClusterMet
if (StorageService.instance.isBootstrapMode()
&& !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally
{
placement = metadata.placements.get(cfs.keyspace.getMetadata().params.replication);
placement = metadata.placement(cfs.keyspace.getMetadata().params.replication);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ private ClusterMetadata checkTokenOwnership(ClusterMetadata metadata, Message<Re

private static Replica getLocalReplica(ClusterMetadata metadata, Token token, String keyspace)
{
return metadata.placements
.get(metadata.schema.getKeyspaces().getNullable(keyspace).params.replication)
return metadata
.placement(metadata.schema.getKeyspaces().getNullable(keyspace).params.replication)
.reads
.forToken(token)
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jo
// we only consider write placements during cleanup as range movements always ensure
// overlap between new replicas accepting reads and old replicas accepting writes
ClusterMetadata cm = ClusterMetadata.current();
DataPlacement placement = cm.placements.get(keyspace.getMetadata().params.replication);
DataPlacement placement = cm.placement(keyspace.getMetadata().params.replication);
InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
RangesAtEndpoint localWrites = placement.writes.byEndpoint().get(local);
// TODO review: Hack to get local partitioner not to fail out because it's handled very poorly with data placements
Expand Down
6 changes: 4 additions & 2 deletions src/java/org/apache/cassandra/db/view/ViewUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.ownership.DataPlacement;

public final class ViewUtils
{
Expand Down Expand Up @@ -64,8 +65,9 @@ public static Optional<Replica> getViewNaturalEndpoint(ClusterMetadata metadata,
Location local = metadata.locator.local();
KeyspaceMetadata keyspaceMetadata = metadata.schema.getKeyspaces().getNullable(keyspace);

EndpointsForToken naturalBaseReplicas = metadata.placements.get(keyspaceMetadata.params.replication).reads.forToken(baseToken).get();
EndpointsForToken naturalViewReplicas = metadata.placements.get(keyspaceMetadata.params.replication).reads.forToken(viewToken).get();
DataPlacement placement = metadata.placement(keyspaceMetadata.params.replication);
EndpointsForToken naturalBaseReplicas = placement.reads.forToken(baseToken).get();
EndpointsForToken naturalViewReplicas = placement.reads.forToken(viewToken).get();

Optional<Replica> localReplica = Iterables.tryFind(naturalViewReplicas, Replica::isSelf).toJavaUtil();
if (localReplica.isPresent())
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/gms/FailureDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ public boolean isAlive(InetAddressAndPort ep)
// registration via the metadata log, or a full gossip round). This is perfectly harmless, so no need to log
// an error in that case.
ClusterMetadata metadata = ClusterMetadata.current();
if (metadata.cmsLookup.isActive() && metadata.fullCMSMembers().contains(ep))
{
logger.trace("Found endpoint {} in active CMS lookup, assuming it is alive", ep);
return true;
}
if (!metadata.directory.allJoinedEndpoints().contains(ep) && !metadata.fullCMSMembers().contains(ep))
logger.error("Unknown endpoint: " + ep, new UnknownEndpointException(ep));
}
Expand Down
7 changes: 5 additions & 2 deletions src/java/org/apache/cassandra/gms/NewGossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
Expand Down Expand Up @@ -68,15 +69,17 @@ public Map<InetAddressAndPort, EndpointState> doShadowRound()
handler = shadowRoundHandler;

int tries = 0;
int maxTries = CassandraRelevantProperties.TCM_SHADOW_ROUND_MAX_ATTEMPTS.getInt();
long timeout = CassandraRelevantProperties.TCM_SHADOW_ROUND_TIMEOUT.getLong();
while (true)
{
try
{
return shadowRoundHandler.doShadowRound().get(15, TimeUnit.SECONDS);
return shadowRoundHandler.doShadowRound().get(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
if (++tries > 3)
if (++tries >= maxTries)
break;
logger.warn("Got no response for shadow round");
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/hints/HintsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private void rehintHintsNeedingRehinting()
// Also may need to apply locally because it's possible this is from the batchlog
// and we never applied it locally
// TODO (review): Additional error handling necessary? Hints are lossy
DataPlacement dataPlacement = cm.placements.get(cm.schema.getKeyspace(mutation.getKeyspaceName()).getMetadata().params.replication);
DataPlacement dataPlacement = cm.placement(cm.schema.getKeyspace(mutation.getKeyspaceName()).getMetadata().params.replication);
VersionedEndpoints.ForToken forToken = dataPlacement.writes.forToken(mutation.key().getToken());
Replica self = forToken.get().selfIfPresent();
if (self != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public Set<NodeId> reconfigure(ClusterMetadata metadata)

// Although MetaStrategy has its own entireRange, it uses a custom partitioner which isn't compatible with
// regular, non-CMS placements. For that reason, we select replicas here using tokens provided by the
// globally configured partitioner.
// globally configured partitioner. This also has the benefit of making concurrent operations, such as
// bounces/upgrades/etc, safe for the CMS if they are replica aware.
Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
EndpointsForRange endpoints = NetworkTopologyStrategy.calculateNaturalReplicas(minToken,
new Range<>(minToken, minToken),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static EndpointsForToken copyOf(Token token, Iterable<Replica> replicas)

public static VersionedEndpoints.ForToken natural(Keyspace keyspace, Token token)
{
return ClusterMetadata.current().placements.get(keyspace.getMetadata().params.replication).reads.forToken(token);
return ClusterMetadata.current().placement(keyspace.getMetadata().params.replication).reads.forToken(token);
}

}
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/locator/MetaStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public MetaStrategy(String keyspaceName, Map<String, String> configOptions)
@Override
public EndpointsForRange calculateNaturalReplicas(Token token, ClusterMetadata metadata)
{
return metadata.placements.get(ReplicationParams.meta(metadata)).reads.forRange(entireRange).get();
return metadata.placement(ReplicationParams.meta(metadata)).reads.forRange(entireRange).get();
}

@Override
public DataPlacement calculateDataPlacement(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata)
{
return metadata.placements.get(ReplicationParams.meta(metadata));
return metadata.placement(ReplicationParams.meta(metadata));
}

@Override
Expand Down
16 changes: 5 additions & 11 deletions src/java/org/apache/cassandra/locator/ReplicaLayout.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(ClusterMetada
{
// todo deduplicate so that "pending" contains "read - write",
// which is a hack until we revisit how consistency level handles pending
DataPlacement dataPlacement = metadata.placements.get(ks.params.replication);
DataPlacement dataPlacement = metadata.placement(ks.params.replication);
natural = forNonLocalStrategyTokenRead(dataPlacement, token);
// perf optimization to avoid double endpoints search and filtering for a typical case
// DataPlacement constructor does a deduplication of reads/writes, so we can use cheap == comparision here
Expand Down Expand Up @@ -394,30 +394,24 @@ static ReplicaLayout.ForRangeRead forRangeReadSorted(ClusterMetadata metadata, K

static EndpointsForRange forNonLocalStategyRangeRead(ClusterMetadata metadata, KeyspaceMetadata keyspace, AbstractBounds<PartitionPosition> range)
{
return metadata.placements.get(keyspace.params.replication).reads.forRange(range.right.getToken()).get();
return metadata.placement(keyspace.params.replication).reads.forRange(range.right.getToken()).get();
}

public static EndpointsForToken forNonLocalStrategyTokenRead(ClusterMetadata metadata, KeyspaceMetadata keyspace, Token token)
{
return forNonLocalStrategyTokenRead(metadata.placements.get(keyspace.params.replication), token);
return forNonLocalStrategyTokenRead(metadata.placement(keyspace.params.replication), token);
}

public static EndpointsForToken forNonLocalStrategyTokenRead(DataPlacement dataPlacement, Token token)
private static EndpointsForToken forNonLocalStrategyTokenRead(DataPlacement dataPlacement, Token token)
{
return dataPlacement.reads.forToken(token).get();
}

static EndpointsForToken forNonLocalStrategyTokenWrite(ClusterMetadata metadata, KeyspaceMetadata keyspace, Token token)
{
return forNonLocalStrategyTokenWrite(metadata.placements.get(keyspace.params.replication), token);
}

static EndpointsForToken forNonLocalStrategyTokenWrite(DataPlacement dataPlacement, Token token)
private static EndpointsForToken forNonLocalStrategyTokenWrite(DataPlacement dataPlacement, Token token)
{
return dataPlacement.writes.forToken(token).get();
}


static EndpointsForRange forLocalStrategyRange(ClusterMetadata metadata, AbstractReplicationStrategy replicationStrategy, AbstractBounds<PartitionPosition> range)
{
return replicationStrategy.calculateNaturalReplicas(range.right.getToken(), metadata);
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/locator/ReplicaPlans.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public static Replica findCounterLeaderReplica(ClusterMetadata metadata, String
NodeProximity proximity = DatabaseDescriptor.getNodeProximity();
AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy();

EndpointsForToken replicas = metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(key.getToken()).get();
EndpointsForToken replicas = metadata.placement(keyspace.getMetadata().params.replication).reads.forToken(key.getToken()).get();

// CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping
replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
Expand Down
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/net/MessageDelivery.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,21 @@ static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> fanoutAndWait(Messag
@Override
public void onResponse(Message<RSP> msg)
{
logger.info("Received a {} response from {}: {}", msg.verb(), msg.from(), msg.payload);
logger.debug("Received a {} response from {}: {}", msg.verb(), msg.from(), msg.payload);
responses.add(Pair.create(msg.from(), msg.payload));
cdl.decrement();
}

@Override
public void onFailure(InetAddressAndPort from, RequestFailure reason)
{
logger.info("Received failure in response to {} from {}: {}", verb, from, reason);
logger.debug("Received failure in response to {} from {}: {}", verb, from, reason);
cdl.decrement();
}
};

sendTo.forEach((ep) -> {
logger.info("Election for metadata migration sending {} ({}) to {}", verb, payload.toString(), ep);
logger.debug("Sending {} ({}) to {}", verb, payload.toString(), ep);
messaging.sendWithCallback(Message.out(verb, payload), ep, callback);
});
cdl.awaitUninterruptibly(timeout, timeUnit);
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class ResponseVerbHandler implements IVerbHandler
Verb.TCM_REPLICATION,
Verb.TCM_NOTIFY_RSP,
Verb.TCM_DISCOVER_RSP,
Verb.TCM_DISCOVER_PEERS_RSP,
Verb.TCM_DISCOVER_SURVEY_RSP,
Verb.TCM_INIT_MIG_RSP);

// We skip epoch catchup for PaxosV2 verbs, since we are using PaxosV2 to serially read the log.
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/net/Verb.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,12 @@ public enum Verb
TCM_ABORT_MIG (811, P0, rpcTimeout, INTERNAL_METADATA, () -> CMSInitializationRequest.Initiator.serializer,() -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ),
TCM_DISCOVER_RSP (812, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, RESPONSE_HANDLER ),
TCM_DISCOVER_REQ (813, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ),
TCM_FETCH_PEER_LOG_RSP (818, P0, shortTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, RESPONSE_HANDLER ),
TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout, FETCH_METADATA, MessageSerializers::logStateSerializer, () -> ResponseVerbHandler.instance ),
TCM_FETCH_PEER_LOG_REQ (819, P0, rpcTimeout, FETCH_METADATA, () -> FetchPeerLog.serializer, () -> FetchPeerLog.Handler.instance, TCM_FETCH_PEER_LOG_RSP ),
TCM_DISCOVER_PEERS_RSP (820, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ),
TCM_DISCOVER_PEERS_REQ (821, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_PEERS_RSP),
TCM_DISCOVER_SURVEY_RSP(822, P0, rpcTimeout, INTERNAL_METADATA, () -> Discovery.nodeIdSerializer, () -> ResponseVerbHandler.instance ),
TCM_DISCOVER_SURVEY_REQ(823, P0, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_SURVEY_RSP),

INITIATE_DATA_MOVEMENTS_RSP (814, P1, rpcTimeout, MISC, () -> NoPayload.serializer, RESPONSE_HANDLER ),
INITIATE_DATA_MOVEMENTS_REQ (815, P1, rpcTimeout, MISC, () -> DataMovement.serializer, () -> DataMovementVerbHandler.instance, INITIATE_DATA_MOVEMENTS_RSP ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public List<Supplier<Future<?>>> repairPaxosForTopologyChangeAsync(String ksName
// are based on the system partitioner
EndpointsForRange endpoints = replication.isMeta()
? ClusterMetadata.current().fullCMSMembersAsReplicas()
: ClusterMetadata.current().placements.get(replication).reads.forRange(range).get();
: ClusterMetadata.current().placement(replication).reads.forRange(range).get();

Set<InetAddressAndPort> liveEndpoints = endpoints.filter(FailureDetector.isReplicaAlive).endpoints();
if (!PaxosRepair.hasSufficientLiveNodesForTopologyChange(keyspace, range, liveEndpoints))
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/Rebuild.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static RangesAtEndpoint rangesForRebuildWithTokens(String tokens, String
private static MovementMap movementMap(ClusterMetadata metadata, String keyspace, String tokens)
{
MovementMap.Builder movementMapBuilder = MovementMap.builder();
DataPlacements placements = metadata.placements;
DataPlacements placements = metadata.placements();
if (keyspace == null)
{
placements.forEach((params, placement) -> movementMapBuilder.put(params, addMovementsForParams(placement, null)));
Expand Down
Loading