CEP-45: Incremental repair for mutation tracking#4696
CEP-45: Incremental repair for mutation tracking#4696aweisberg wants to merge 36 commits intoapache:cep-45-mutation-trackingfrom
Conversation
…dn't work in 2 minutes it won't work.
…tionTrackingIncrementalRepairTask
…ce shard for keyspace distributed_test_keyspace, but it already exists on bounce
…marking sstables repaired to effect the migration
…e it will hang, so just use IR instead
…ntirely inside migrated ranges or entirely outside, but not both
…een completed, use tryFailure instead
| for (Shard shard : overlappingShards) | ||
| { | ||
| ShardSyncState state = new ShardSyncState(shard, liveHostIds); | ||
| shardStates.put(shard.range, state); |
There was a problem hiding this comment.
Trying to reason about the thread safety of shardStates here...
The assignment is clearly visible after the CAS above, but are the iterations inside the callbacks later guaranteed to see the results of the put()s here?
There was a problem hiding this comment.
Register sync coordinator is a write barrier because it does a put in a ConcurrentHashMap? So any prior writes will be visible? As long shardStates is effectively immutable after that particular map should be OK.
This could be an ImmutableMap which might make it a little clearer so I'll make that change.
| finally | ||
| { | ||
| if (!allSucceeded) | ||
| syncCoordinator.cancel(); |
There was a problem hiding this comment.
What happens if the try block above produces InterruptedException? Do we need to cancel the rest of the sync coordinators (that hadn't been processed yet)?
There was a problem hiding this comment.
We should clean them up just so they don't have to wait for their timeout to elapse to clean up. I'll rework the exception handling here to catch Exception instead of RuntimeException.
| CLUSTER.get(1).nodetoolResult("repair", specification.keyspaceName()).asserts().success(); | ||
| // Background reconciliation doesn't exist/work so incremental repair just hangs waiting for reconciliation that never occurs | ||
| if (specification.replicationType.isTracked()) | ||
| CLUSTER.get(1).nodetoolResult("repair", "-full", specification.keyspaceName()).asserts().success(); |
There was a problem hiding this comment.
Should an incremental repair request succeed after a successful full repair? It tried this, and it appears to hang, but I'm not sure why yet...
There was a problem hiding this comment.
"node1_Repair#4:1" #270 daemon prio=5 os_prio=31 cpu=0.08ms elapsed=92.27s tid=0x000000013de75200 nid=0x2530b waiting on condition [0x000000036944a000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@11.0.19/Native Method)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.19/LockSupport.java:357)
at org.apache.cassandra.utils.concurrent.AsyncFuture.awaitUntil(AsyncFuture.java:221)
at org.apache.cassandra.utils.concurrent.Awaitable$Defaults.await(Awaitable.java:114)
at org.apache.cassandra.utils.concurrent.AbstractFuture.await(AbstractFuture.java:482)
at org.apache.cassandra.utils.concurrent.AbstractFuture.get(AbstractFuture.java:252)
at org.apache.cassandra.replication.MutationTrackingSyncCoordinator.awaitCompletion(MutationTrackingSyncCoordinator.java:351)
at org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask.waitForSyncCompletion(MutationTrackingIncrementalRepairTask.java:127)
There was a problem hiding this comment.
Anyway, I think the lack of background reconciliation still means that this won't work. The transfer IDs are only there to make sure read reconciliation works.
There was a problem hiding this comment.
I misunderstood the test. I don't think IR should hang in this test because we aren't relying on background reconciliation. There aren't any down nodes at all.
There was a problem hiding this comment.
Ah right now I remember. So the test inserts data using executeInternal which gives the mutation and id and applys it locally correclty, but because it's only applied locally it never propagates because there is no background reconciliation.
Mutations applied via execute/StorageProxy are given to ActiveLogReconciler which is basically in-memory hinted handoff for mutation tracking.
So this is working as intended for now in that we need to use full repair here instead of IR since IR can't complete until background reconciliation is done.
| public int hashCode() | ||
| { | ||
| return Objects.hash(desc, offsetsByShard); | ||
| } |
There was a problem hiding this comment.
nit: Do we ever actually put MutationTrackingSyncResponse in a collection?
There was a problem hiding this comment.
I'll remove hashCode and equals. I ran the tests and I don't think they get used anymore.
There was a problem hiding this comment.
It's needed for RepairMessageSerializationsTest
| { | ||
| logger.warn("Mutation tracking sync failed for keyspace {}", keyspace, error); | ||
| resultPromise.tryFailure(error); | ||
| return; |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
There was a problem hiding this comment.
I'll add a test that allows the timeout to elapse.
There was a problem hiding this comment.
Ah the test that is supposed to test timeouts blocks all verbs so you it times out on the prepare not doing the actual sync. I'll fix that test.
| catch (RuntimeException e) | ||
| { | ||
| allSucceeded = false; | ||
| error = Throwables.merge(error, e); |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
There was a problem hiding this comment.
I'll convert timeouts to exceptions so it can be exercised.
| catch (Exception e) | ||
| { | ||
| logger.error("Error during mutation tracking repair", e); | ||
| resultPromise.tryFailure(e); |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
There was a problem hiding this comment.
The errors are put in resultPromise and don't get surfaced by allowing them to bubble up. To make this branch fire I could let exceptions bubble up and then get handled here. Would be less exception handling in general and then it would show up as tested.
I'll do that.
| if (allRanges.isEmpty()) | ||
| { | ||
| logger.info("No common ranges to repair for keyspace {}", keyspace); | ||
| return new AsyncPromise<CoordinatedRepairResult>().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
| if (overlappingShards.isEmpty()) | ||
| { | ||
| completionFuture.setSuccess(null); | ||
| return; |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
| public void onFailure(InetAddressAndPort from, RequestFailure failure) | ||
| { | ||
| fail(new RuntimeException( | ||
| String.format("Mutation tracking sync failed: participant %s returned failure %s", from, failure.reason))); |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
| Shard currentShard = getCurrentShard(state.shard.range); | ||
| if (currentShard != state.shard) | ||
| { | ||
| failWithTopologyChange(); |
There was a problem hiding this comment.
nit: Coverage tooling indicates this might not be tested.
There was a problem hiding this comment.
Topology changes aren't supported yet https://issues.apache.org/jira/browse/CASSANDRA-20386
I'll take a look and see if I can at least induce one to exercise this failure path.
It might end up being more unit test then end to end test.
| * their current witnessed offsets. This establishes a happens-before relationship: the | ||
| * participant's response contains offsets captured after receiving this request, which is | ||
| * sent after the repair starts. | ||
| * |
There was a problem hiding this comment.
nit:
| * | |
| * <p> |
| inMigrationPendingRange = migrationInfo.isRangeInPendingMigration(metadata().id, | ||
| first.getToken(), | ||
| last.getToken()); | ||
| } |
There was a problem hiding this comment.
nit: Could replace the above w/
KeyspaceMigrationInfo migrationInfo = ClusterMetadata.current().mutationTrackingMigrationState.getKeyspaceInfo(metadata().keyspace);
boolean inMigrationPendingRange = migrationInfo != null && migrationInfo.isRangeInPendingMigration(metadata().id, first.getToken(), last.getToken());
| // when incremental repair streams SSTables that were written before tracking was enabled. | ||
| Preconditions.checkState(!cfstore.metadata().replicationType().isTracked() | ||
| || ClusterMetadata.current().mutationTrackingMigrationState | ||
| .getKeyspaceInfo(cfstore.metadata().keyspace) != null); |
There was a problem hiding this comment.
nit: Might be nice to have something like an isMigrating(String) on MTMS, but just a matter of taste I guess.
There was a problem hiding this comment.
I'll update it to use a helper.
| { | ||
| Preconditions.checkState(!cfstore.metadata().replicationType().isTracked()); | ||
| // Tracked tables may legitimately use this path during migration from untracked to tracked, | ||
| // when incremental repair streams SSTables that were written before tracking was enabled. |
There was a problem hiding this comment.
Does this mean that during migration to tracked, we'd expect these SSTables to have no coordinator log offsets then? Is that worth asserting?
There was a problem hiding this comment.
It shouldn't matter for imports, since the keyspace being currently tracked means we'll avoid this method.
There was a problem hiding this comment.
No we will actually hit this method during migration. The sstables might actually have offsets in them since tracked writes have already started and the incremental repair starts after.
| // flag on the mutation hasn't been set yet at this point — it's set later in | ||
| // applyMutation() — so we check the handler type instead. | ||
| if (this instanceof ReadRepairVerbHandler) | ||
| return metadata; |
There was a problem hiding this comment.
nit: I guess the other option would be something like a handlesReadRepair() method that only ReadRepairVerbHandler overrides, but it's literally called ReadRepairVerbHandler, and we probably won't have something else handle RR mutations.
In any case, I'm remembering blocking RR is going to be reworked for migration anyway, so ignore me :D
| @Nonnull Collection<String> columnFamilies) | ||
| { | ||
| Iterable<TableMetadata> tables; | ||
| if (!columnFamilies.isEmpty()) |
There was a problem hiding this comment.
nit: This is almost a case where null would be nice to indicate "all tables", in the sense that an empty collection might be more likely than null to indicate incorrect argument construction.
| RepairTask task = new PreviewRepairTask(this, state.id, neighborsAndRanges.filterCommonRanges(state.keyspace, cfnames), neighborsAndRanges.shouldExcludeDeadParticipants, cfnames); | ||
| return task.perform(executor, validationScheduler) | ||
| .<Pair<CoordinatedRepairResult, Supplier<String>>>map(r -> Pair.create(r, task::successMessage)) | ||
| .addCallback((s, f) -> executor.shutdown()); |
There was a problem hiding this comment.
This block here is duplicated 3 more times below. The original code here avoided that by returning after the if/else stuff, but we could just delegate to a submitRepairTask() or something similar.
| RepairJobDesc desc = new RepairJobDesc(parentSession, TimeUUID.Generator.nextTimeUUID(), | ||
| keyspace, "Mutation Tracking Sync", List.of(range)); | ||
| MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator( | ||
| coordinator.ctx, desc, commonRange.endpoints, metadata); |
There was a problem hiding this comment.
MutationTrackingSyncCoordinator syncCoordinator =
new MutationTrackingSyncCoordinator(coordinator.ctx, desc, commonRange.endpoints, metadata);
...might be a little easier on the eyes.
| Pair<CoordinatedRepairResult, Supplier<String>> irPair = Pair.create(irResult, incrementalTask::successMessage); | ||
| mtTask.perform(executor, validationScheduler) | ||
| .addCallback( | ||
| mtResult -> result.trySuccess(irPair), |
There was a problem hiding this comment.
Do we need to handle partial failure here? (i.e. Do we just return the irPair result if the MT task partially fails?)
| * Determines if this keyspace should use mutation tracking incremental repair. | ||
| * Returns true if: | ||
| * - Keyspace uses mutation tracking replication, OR | ||
| * - Keyspace is currently migrating (either direction) |
There was a problem hiding this comment.
nit: Not strictly true if migrating to untracked?
| for (Range<Token> range : commonRange.ranges) | ||
| { | ||
| RepairJobDesc desc = new RepairJobDesc(parentSession, TimeUUID.Generator.nextTimeUUID(), | ||
| keyspace, "Mutation Tracking Sync", List.of(range)); |
There was a problem hiding this comment.
Table name is meaningless here, right?
There was a problem hiding this comment.
Yes but I figured for debugging purposes it's clearer to not leave it empty.
|
|
||
| if (overlappingShards.isEmpty()) | ||
| { | ||
| completionFuture.setSuccess(null); |
There was a problem hiding this comment.
nit: Might be nice to have a DEBUG level log message to indicate this happened.
| } | ||
| // Always include the local node | ||
| liveHostIds.add(metadata.directory.peerId(ctx.broadcastAddressAndPort()).id()); | ||
| } |
There was a problem hiding this comment.
nit: If we just build the liveHostIds at construction time, could we make it final?
| if (completionFuture.isDone()) | ||
| return; | ||
|
|
||
| recaptureTargets(); |
There was a problem hiding this comment.
It looks like this is called from updateReplicatedOffsets(), but does that mean we keep expanding the targets after the initial round of sync requests? (i.e. If there are ongoing writes, can this cause the whole IR to time out?)
No description provided.