Skip to content

CASSANALYTICS-160: Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes#214

Open
sarankk wants to merge 1 commit into
apache:trunkfrom
sarankk:CASSANALYTICS-160
Open

CASSANALYTICS-160: Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes#214
sarankk wants to merge 1 commit into
apache:trunkfrom
sarankk:CASSANALYTICS-160

Conversation

@sarankk

@sarankk sarankk commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

No description provided.

… to determine appropriate stream session to use for bulk writes
*
* @param schemaStr full cluster schema string as returned by Sidecar
* @param keyspace name of the keyspace to check
* @return {@code true} if keyspace is tracked {@code false} otherwise

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is stale, extractReplicationType returning a string


@Override
public DirectStreamSession createStreamSession(BulkWriterContext writerContext,
String sessionId,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation is better readable than the new one

{
if (bridge().isTracked(clusterInfo.getReplicationType()))
{
return new TrackedDirectStreamSession(writerContext,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we plan to implement the feature in multiple PRs and phases, can we have a feature flag, set to disabled by default, so the existing code doesn't get impacted by this feature changes by any chance. And developers can turn it on when they like to try it out. CI can enable the feature flag to test the code. We can set it enabled by default when we reach a comfortable stage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracked stream session should not be used for existing use cases since keyspace would not be tracked and we cannot use direct stream session when keyspace is tracked. Hence feature flag might not be necessary here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we need to throw unsupported here instead of waiting until TrackedDirectStreamSession methods are called

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any new feature, we should either develop them in a feature branch or disable it under a feature flag until it is complete and stable. For example, mutation tracking feature in C* is in a feature branch.
Especially in this PR, we are introducing an incomplete feature to the upstream. So having a feature flag is very important to ensure existing use cases are not broken by any chance with our new code inadvertently.

* replica independently streamed the data to its peers.
* <p>
*/
public class TrackedDirectStreamSession extends StreamSession<TransportContext.DirectDataBulkWriterContext>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to implement similar one for CloudStorageStreamSession as well? In non-coordinated mode, it calls tryCreateRestoreSlicePerReplica. My understanding is, if the destination Cassandra keyspace is tracked, we will end up writing to every replica instead of writing to coordinator only.

Incase of coordinated cloud storage mode, though analytics writes to one sidecar, we need to verify does Sidecar/Cassandra restore code is handling tracked keyspaces correctly or not.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to implement everything in this PR. But we may need to create similar class for these cases and follow up JIRAs not to miss them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we will create follow up JIRAs for S3 mode work

+ "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }"
+ " AND DURABLE_WRITES = true AND replication_type = 'tracked';";
assertThat(CqlUtils.extractReplicationType(schemaStrTracked, "k")).isEqualTo("tracked");
String schemaStrTrackedCaseInsensitive

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better readability, can you please either create separate tests for these or separate them with new lines.

Comment thread CHANGES.txt
@@ -1,5 +1,6 @@
0.5.0
-----
* Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes (CASSANALYTICS-160)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] Prefer shorter and simpler statement

}

@Override
public String getReplicationType()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CassandraClusterInfoGroup not overriding this, i.e, doesn't detect tracked keyspaces

+ " AND durable_writes = true"
+ " AND replication_type = 'tracked';";
CassandraClusterInfo ci = mockClusterInfoWithSchema("mykeyspace", schema);
assertThat(ci.getReplicationType())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are not testing changes to CassandraClusterInfo, because MockClusterInfoForSchema overriding getReplicationType to call CqlUtils.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes otherwise we need to mock sidecar client calls to get schema, avoiding that

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have in-jvm tests to test CassandraClusterInfo code path then? Tests in CassandraClusterInfoTest.java‎ should test CassandraClusterInfo. If it is difficult, we can have in-jvm tests which go through this class.

return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge(), jobInfo);
}

private CassandraBridge bridge()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writerContext already has bridge() method. can you please reuse that

*/
public static boolean isTracked(String replicationType)
{
return TRACKED_REPLICATION_TYPE.equalsIgnoreCase(replicationType);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are checking for replication_type value only. But the cluster need to have mutation_tracking.enabled to true for the feature to work at Cassandra side. Can you please explore what happens in Cassandra if a keyspace was created with replication_type = tracked, but then later admins changed mutation_tracking.enabled to false? Based on these findings we may need to check for mutation_tracking.enabled as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants