Skip to content

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477

Open
aho135 wants to merge 12 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream
Open

feat: resetOffsetsAndBackfill using bounded stream supervisor#19477
aho135 wants to merge 12 commits into
apache:masterfrom
aho135:resetOffsetsAndBackfill-bounded-stream

Conversation

@aho135
Copy link
Copy Markdown
Contributor

@aho135 aho135 commented May 18, 2026

This change adds an endpoint called resetOffsetsAndBackfill to SupervisorResource. This is a useful feature for operating Druid clusters where the most recent data is the most important (such as alerting use cases).

Description

Adds an endpoint called resetOffsetsAndBackfill to automatically ingest skipped data in the case where the offset is reset to latest. This requires useEarliestOffset=false, useConcurrentLocks=true because there can be conflicting time intervals between the backfill task and the main supervisor tasks and the Supervisor needs to be in a running state in order to call updatePartitionLagFromStream() to get the latest offsets

Release note

Adds an endpoint called resetOffsetsAndBackfill for Stream Supervisors. This is a useful feature for operating Druid clusters where the most recent data is the most important


Key changed/added classes in this PR
  • SupervisorResource
  • SupervisorManager

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@aho135 aho135 mentioned this pull request May 18, 2026
9 tasks
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3
Severity Findings
P0 0
P1 1
P2 2
P3 0
Total 3

Reviewed 11 of 11 changed files.


This is an automated review by Codex GPT-5.5


@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Kinesis cannot provide backfill end offsets

The new manager path calls updatePartitionLagFromStream() and then getLatestSequencesFromStream(), but Kinesis only updates time lag here and does not override getLatestSequencesFromStream(), so it inherits the base empty map. Any Kinesis supervisor that passes the earlier checks will fail with empty latest offsets instead of starting a backfill.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @jaykanakiya will be tackling Kinesis support in a separate PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming. I reviewed all 11 changed files in the current head; since this PR still exposes /resetOffsetsAndBackfill to every SeekableStreamSupervisor, Kinesis still reaches SupervisorManager.resetSupervisorAndBackfill(), calls updatePartitionLagFromStream(), then sees the base empty getLatestSequencesFromStream() and returns a 500. If Kinesis support is intentionally deferred, can this PR explicitly reject or disable Kinesis and document the supported supervisor types, rather than leaving the new endpoint available but broken until the follow-up PR lands?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021! I'm not able to do an instanceof check in SupervisorManager due to module dependencies. I could add something like supportsResetAndBackfill to SeekableStreamSupervisor but seems like overkill given that it'll be reverted after getLatestSequencesFromStream() is implemented for Kinesis. Given that the current response of 500 is accurate I think it's fine for now

Alternatively we can hold off on merging this until the Kinesis changes are in

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think returning a 5xx that should be good enough

We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that sounds reasonable if Kinesis support is intentionally deferred. The current docs still say the endpoint applies to a streaming supervisor, and the same page describes streaming supervisors as Kafka or Kinesis, so Kinesis users would still be directed to an endpoint that is expected to 500. Reviewed 11 of 11 changed files; I think this just needs the explicit Kinesis limitation in the new docs section.

Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aho135! Left some comments

Comment thread docs/api-reference/supervisor-api.md Outdated
The following requirements must be met before calling this endpoint:

- The supervisor must be a `SeekableStreamSupervisor`.
- The supervisor's `useEarliestSequenceNumber` property must be `false`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this requirement needed? With useConcurrentLocks and two parallel supervisors, I'd imagine this would still work if the offset ranges are overlapping

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is still a requirement even with separate supervisors. This is essentially multi-supervisor support which requires useConcurrentLocks, as the two supervisors can be writing to the same interval


@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think returning a 5xx that should be good enough

We could probably add a one-line limitation in the docs for completeness noting that the feature doesn’t apply to Kinesis supervisors ? That can be removed once that gap is addressed in a future patch depending on when it lands

}

@Test
public void testResetSupervisorAndBackfill() throws Exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tests @aho135! Do you think it’d be possible to extend the embedded test KafkaBoundedSupervisorTest you added to include some end-to-end coverage for this API as well?

Comment on lines +393 to +433
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");

Pair<Supervisor, SupervisorSpec> supervisorPair = supervisors.get(id);
if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
}
SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs;
SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs;

// Verify useEarliestOffset is false
if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true.");
}

if (!specHasConcurrentLocks(streamSpec)) {
throw new IAE(
"Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks"
);
}

// We need an active recordSupplier to query the latest offsets from the stream
if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) {
throw new IAE("Supervisor[%s] must be in a RUNNING state to perform a reset and backfill", id);
}

log.info("Capturing latest offsets from stream for supervisor[%s]", id);
streamSupervisor.updatePartitionLagFromStream();
Map<?, ?> endOffsets = streamSupervisor.getLatestSequencesFromStream();

log.info("Capturing checkpointed offsets for supervisor[%s]", id);
Map<?, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();

// Validate that we successfully retrieved offsets
if (endOffsets == null || endOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id);
}
if (startOffsets == null || startOffsets.isEmpty()) {
throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It’ll likely be cleaner to move these validations into their own function

@github-actions github-actions Bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels May 21, 2026
Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1
Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

Reviewed 17 of 17 changed files.


This is an automated review by Codex GPT-5.5

endOffsets
);

streamSupervisor.resetOffsets(resetMetadata);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Avoid resetting the main supervisor with stale offsets

The main supervisor keeps ingesting while this method captures endOffsets/startOffsets, creates the backfill supervisor, and only then calls resetOffsets. If a running task checkpoints after the captured endOffsets but before this call, resetOffsetsInternal builds currentMetadata.plus(resetMetadata), and plus overwrites the newer partition value with the stale captured end offset. That rolls the main supervisor checkpoint backward and can reingest rows that were already published. The reset should be conditional on metadata still matching the captured range, retried with fresh offsets, or performed while the main supervisor is quiesced.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankChen021 This should be a very rare scenario since I expect this endpoint only to be used if there is some lag built up. I did push a change to prefer the checkpointed offset if it is greater than the requested reset offset to handle this edge case

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue still stands that there can be duplicate ingestion with the backfill Supervisor if the main Supervisor checkpoints after startOffsets is captured. To completely remove this race I think we'd need to suspend the supervisor and wait for all tasks to gracefully shutdown. This is something that I was trying to avoid as then we would need to deal with timeouts and such. Let me know your thoughts on this one though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding resetOffsetsForwardOnly; it handles the common case where the checkpoint has already advanced by the time the reset notice reads metadata. I still think the rollback race is not fully closed, though: resetOffsetsInternal reads current metadata, computes the max map, then calls resetDataSourceMetadata, and that storage method does an unconditional UPDATE ... WHERE dataSource = :dataSource. If a task publishes/checkpoints after the read but before that update, the max is computed from stale metadata and the reset can still overwrite the newer checkpoint. So I would keep this open unless the reset write is CAS/retried against the metadata it read, or the main supervisor is quiesced before capturing/resetting. I agree the backfill-overlap duplicate case is separate and requires either accepting/documenting possible duplicates or suspending the main supervisor. Reviewed 17 of 17 changed files.

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021

I am leaning towards reverting the resetOffsetsForwardOnly changes. My reasoning is that "backfill-overlap duplicate case" is not solvable without suspending and task shutdown. I expect that this duplication scenario is more common than the one resetOffsetsForwardOnly addresses, since it would only make sense to use this endpoint in the case where there is some lag built up. If we're not solving that scenario and accepting that this endpoint has the potential for duplicate ingestion then I think it would make sense to just accept the limitation and document it rather than adding additional complexity to solve the even more rare duplication scenario.

Since many data streaming pipelines follow at least once ingestion and the duplication scenario is rare and bounded I think this is a reasonable trade-off to not have to deal with suspension and task shutdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants