feat: resetOffsetsAndBackfill using bounded stream supervisor#19477
feat: resetOffsetsAndBackfill using bounded stream supervisor#19477aho135 wants to merge 12 commits into
Conversation
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 2 |
| P3 | 0 |
| Total | 3 |
Reviewed 11 of 11 changed files.
This is an automated review by Codex GPT-5.5
|
|
||
| @Override | ||
| protected void updatePartitionLagFromStream() | ||
| public void updatePartitionLagFromStream() |
There was a problem hiding this comment.
[P2] Kinesis cannot provide backfill end offsets
The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.
There was a problem hiding this comment.
Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR
There was a problem hiding this comment.
Thanks for confirming. I reviewed all 11 changed files in the current head; since this PR still exposes /resetOffsetsAndBackfill to every SeekableStreamSupervisor, Kinesis still reaches SupervisorManager.resetSupervisorAndBackfill(), calls updatePartitionLagFromStream(), then sees the base empty getLatestSequencesFromStream() and returns a 500. If Kinesis support is intentionally deferred, can this PR explicitly reject or disable Kinesis and document the supported supervisor types, rather than leaving the new endpoint available but broken until the follow-up PR lands?
There was a problem hiding this comment.
Thanks for the review @FrankChen021! I'm not able to do an instanceof check in SupervisorManager due to module dependencies. I could add something like supportsResetAndBackfill to SeekableStreamSupervisor but seems like overkill given that it'll be reverted after getLatestSequencesFromStream() is implemented for Kinesis. Given that the current response of 500 is accurate I think it's fine for now
Alternatively we can hold off on merging this until the Kinesis changes are in
There was a problem hiding this comment.
Yeah, I think returning a 5xx that should be good enough
We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands
There was a problem hiding this comment.
Thanks, that sounds reasonable if Kinesis support is intentionally deferred. The current docs still say the endpoint applies to a streaming supervisor, and the same page describes streaming supervisors as Kafka or Kinesis, so Kinesis users would still be directed to an endpoint that is expected to 500. Reviewed 11 of 11 changed files; I think this just needs the explicit Kinesis limitation in the new docs section.
abhishekrb19
left a comment
There was a problem hiding this comment.
Thanks @aho135! Left some comments
| The following requirements must be met before calling this endpoint: | ||
|
|
||
| - The supervisor must be a `SeekableStreamSupervisor`. | ||
| - The supervisor's `useEarliestSequenceNumber` property must be `false`. |
There was a problem hiding this comment.
Is this requirement needed? With useConcurrentLocks and two parallel supervisors, I'd imagine this would still work if the offset ranges are overlapping
There was a problem hiding this comment.
Yeah this is still a requirement even with separate supervisors. This is essentially multi-supervisor support which requires useConcurrentLocks, as the two supervisors can be writing to the same interval
|
|
||
| @Override | ||
| protected void updatePartitionLagFromStream() | ||
| public void updatePartitionLagFromStream() |
There was a problem hiding this comment.
Yeah, I think returning a 5xx that should be good enough
We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands
| } | ||
|
|
||
| @Test | ||
| public void testResetSupervisorAndBackfill() throws Exception |
There was a problem hiding this comment.
Thanks for the tests @aho135! Do you think it’d be possible to extend the embedded test KafkaBoundedSupervisorTest you added to include some end-to-end coverage for this API as well?
| Preconditions.checkState(started, "SupervisorManager not started"); | ||
| Preconditions.checkNotNull(id, "id"); | ||
|
|
||
| Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id); | ||
| if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) { | ||
| throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id); | ||
| } | ||
| SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs; | ||
| SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs; | ||
|
|
||
| // Verify useEarliestOffset is false | ||
| if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) { | ||
| throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true."); | ||
| } | ||
|
|
||
| if (!specHasConcurrentLocks(streamSpec)) { | ||
| throw new IAE( | ||
| "Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks" | ||
| ); | ||
| } | ||
|
|
||
| // We need an active recordSupplier to query the latest offsets from the stream | ||
| if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) { | ||
| throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id); | ||
| } | ||
|
|
||
| log.info("Capturing latest offsets from stream for supervisor[%s]", id); | ||
| streamSupervisor.updatePartitionLagFromStream(); | ||
| Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream(); | ||
|
|
||
| log.info("Capturing checkpointed offsets for supervisor[%s]", id); | ||
| Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage(); | ||
|
|
||
| // Validate that we successfully retrieved offsets | ||
| if (endOffsets == null || endOffsets.isEmpty()) { | ||
| throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id); | ||
| } | ||
| if (startOffsets == null || startOffsets.isEmpty()) { | ||
| throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id); | ||
| } | ||
|
|
There was a problem hiding this comment.
nit: It’ll likely be cleaner to move these validations into their own function
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 17 of 17 changed files.
This is an automated review by Codex GPT-5.5
| endOffsets | ||
| ); | ||
|
|
||
| streamSupervisor.resetOffsets(resetMetadata); |
There was a problem hiding this comment.
[P1] Avoid resetting the main supervisor with stale offsets
The main supervisor keeps ingesting while this method captures endOffsets/startOffsets, creates the backfill supervisor, and only then calls resetOffsets. If a running task checkpoints after the captured endOffsets but before this call, resetOffsetsInternal builds currentMetadata.plus(resetMetadata), and plus overwrites the newer partition value with the stale captured end offset. That rolls the main supervisor checkpoint backward and can reingest rows that were already published. The reset should be conditional on metadata still matching the captured range, retried with fresh offsets, or performed while the main supervisor is quiesced.
There was a problem hiding this comment.
Thanks @FrankChen021 This should be a very rare scenario since I expect this endpoint only to be used if there is some lag built up. I did push a change to prefer the checkpointed offset if it is greater than the requested reset offset to handle this edge case
There was a problem hiding this comment.
The issue still stands that there can be duplicate ingestion with the backfill Supervisor if the main Supervisor checkpoints after startOffsets is captured. To completely remove this race I think we'd need to suspend the supervisor and wait for all tasks to gracefully shutdown. This is something that I was trying to avoid as then we would need to deal with timeouts and such. Let me know your thoughts on this one though
There was a problem hiding this comment.
Thanks for adding resetOffsetsForwardOnly; it handles the common case where the checkpoint has already advanced by the time the reset notice reads metadata. I still think the rollback race is not fully closed, though: resetOffsetsInternal reads current metadata, computes the max map, then calls resetDataSourceMetadata, and that storage method does an unconditional UPDATE ... WHERE dataSource = :dataSource. If a task publishes/checkpoints after the read but before that update, the max is computed from stale metadata and the reset can still overwrite the newer checkpoint. So I would keep this open unless the reset write is CAS/retried against the metadata it read, or the main supervisor is quiesced before capturing/resetting. I agree the backfill-overlap duplicate case is separate and requires either accepting/documenting possible duplicates or suspending the main supervisor. Reviewed 17 of 17 changed files.
There was a problem hiding this comment.
Thanks for the review @FrankChen021
I am leaning towards reverting the resetOffsetsForwardOnly changes. My reasoning is that "backfill-overlap duplicate case" is not solvable without suspending and task shutdown. I expect that this duplication scenario is more common than the one resetOffsetsForwardOnly addresses, since it would only make sense to use this endpoint in the case where there is some lag built up. If we're not solving that scenario and accepting that this endpoint has the potential for duplicate ingestion then I think it would make sense to just accept the limitation and document it rather than adding additional complexity to solve the even more rare duplication scenario.
Since many data streaming pipelines follow at least once ingestion and the duplication scenario is rare and bounded I think this is a reasonable trade-off to not have to deal with suspension and task shutdown
This change adds an endpoint called resetOffsetsAndBackfill to SupervisorResource. This is a useful feature for operating Druid clusters where the most recent data is the most important (such as alerting use cases).
Description
Adds an endpoint called resetOffsetsAndBackfill to automatically ingest skipped data in the case where the offset is reset to latest. This requires useEarliestOffset=false, useConcurrentLocks=true because there can be conflicting time intervals between the backfill task and the main supervisor tasks and the Supervisor needs to be in a running state in order to call updatePartitionLagFromStream() to get the latest offsets
Release note
Adds an endpoint called
resetOffsetsAndBackfillfor Stream Supervisors. This is a useful feature for operating Druid clusters where the most recent data is the most importantKey changed/added classes in this PR
SupervisorResourceSupervisorManagerThis PR has: