CASSANALYTICS-160: Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes#214
Conversation
… to determine appropriate stream session to use for bulk writes
| * | ||
| * @param schemaStr full cluster schema string as returned by Sidecar | ||
| * @param keyspace name of the keyspace to check | ||
| * @return {@code true} if keyspace is tracked {@code false} otherwise |
There was a problem hiding this comment.
This is stale, extractReplicationType returning a string
|
|
||
| @Override | ||
| public DirectStreamSession createStreamSession(BulkWriterContext writerContext, | ||
| String sessionId, |
There was a problem hiding this comment.
This indentation is better readable than the new one
| { | ||
| if (bridge().isTracked(clusterInfo.getReplicationType())) | ||
| { | ||
| return new TrackedDirectStreamSession(writerContext, |
There was a problem hiding this comment.
As we plan to implement the feature in multiple PRs and phases, can we have a feature flag, set to disabled by default, so the existing code doesn't get impacted by this feature changes by any chance. And developers can turn it on when they like to try it out. CI can enable the feature flag to test the code. We can set it enabled by default when we reach a comfortable stage.
There was a problem hiding this comment.
The tracked stream session should not be used for existing use cases since keyspace would not be tracked and we cannot use direct stream session when keyspace is tracked. Hence feature flag might not be necessary here
There was a problem hiding this comment.
And we need to throw unsupported here instead of waiting until TrackedDirectStreamSession methods are called
There was a problem hiding this comment.
For any new feature, we should either develop them in a feature branch or disable it under a feature flag until it is complete and stable. For example, mutation tracking feature in C* is in a feature branch.
Especially in this PR, we are introducing an incomplete feature to the upstream. So having a feature flag is very important to ensure existing use cases are not broken by any chance with our new code inadvertently.
| * replica independently streamed the data to its peers. | ||
| * <p> | ||
| */ | ||
| public class TrackedDirectStreamSession extends StreamSession<TransportContext.DirectDataBulkWriterContext> |
There was a problem hiding this comment.
Don't we need to implement similar one for CloudStorageStreamSession as well? In non-coordinated mode, it calls tryCreateRestoreSlicePerReplica. My understanding is, if the destination Cassandra keyspace is tracked, we will end up writing to every replica instead of writing to coordinator only.
Incase of coordinated cloud storage mode, though analytics writes to one sidecar, we need to verify does Sidecar/Cassandra restore code is handling tracked keyspaces correctly or not.
There was a problem hiding this comment.
We don't have to implement everything in this PR. But we may need to create similar class for these cases and follow up JIRAs not to miss them.
There was a problem hiding this comment.
Yes we will create follow up JIRAs for S3 mode work
| + "= { 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'dc1': '3' }" | ||
| + " AND DURABLE_WRITES = true AND replication_type = 'tracked';"; | ||
| assertThat(CqlUtils.extractReplicationType(schemaStrTracked, "k")).isEqualTo("tracked"); | ||
| String schemaStrTrackedCaseInsensitive |
There was a problem hiding this comment.
For better readability, can you please either create separate tests for these or separate them with new lines.
| @@ -1,5 +1,6 @@ | |||
| 0.5.0 | |||
| ----- | |||
| * Analytics should identify if a keyspace is tracked to determine appropriate stream session to use for bulk writes (CASSANALYTICS-160) | |||
There was a problem hiding this comment.
[Minor] Prefer shorter and simpler statement
| } | ||
|
|
||
| @Override | ||
| public String getReplicationType() |
There was a problem hiding this comment.
CassandraClusterInfoGroup not overriding this, i.e, doesn't detect tracked keyspaces
| + " AND durable_writes = true" | ||
| + " AND replication_type = 'tracked';"; | ||
| CassandraClusterInfo ci = mockClusterInfoWithSchema("mykeyspace", schema); | ||
| assertThat(ci.getReplicationType()) |
There was a problem hiding this comment.
These tests are not testing changes to CassandraClusterInfo, because MockClusterInfoForSchema overriding getReplicationType to call CqlUtils.
There was a problem hiding this comment.
Yes otherwise we need to mock sidecar client calls to get schema, avoiding that
There was a problem hiding this comment.
Can we have in-jvm tests to test CassandraClusterInfo code path then? Tests in CassandraClusterInfoTest.java should test CassandraClusterInfo. If it is difficult, we can have in-jvm tests which go through this class.
| return new SidecarDataTransferApi(clusterInfo.getCassandraContext(), bridge(), jobInfo); | ||
| } | ||
|
|
||
| private CassandraBridge bridge() |
There was a problem hiding this comment.
writerContext already has bridge() method. can you please reuse that
| */ | ||
| public static boolean isTracked(String replicationType) | ||
| { | ||
| return TRACKED_REPLICATION_TYPE.equalsIgnoreCase(replicationType); |
There was a problem hiding this comment.
Here we are checking for replication_type value only. But the cluster need to have mutation_tracking.enabled to true for the feature to work at Cassandra side. Can you please explore what happens in Cassandra if a keyspace was created with replication_type = tracked, but then later admins changed mutation_tracking.enabled to false? Based on these findings we may need to check for mutation_tracking.enabled as well.
No description provided.