Skip to content

CFP Time based checkpoint strategy#48780

Open
dibahlfi wants to merge 2 commits intomainfrom
users/dibahl/cfp-checkpoint-strategy
Open

CFP Time based checkpoint strategy#48780
dibahlfi wants to merge 2 commits intomainfrom
users/dibahl/cfp-checkpoint-strategy

Conversation

@dibahlfi
Copy link
Copy Markdown
Member

By default, CFP writes a checkpoint after every single batch. This is the safest behavior — the replay window is at most one batch — but every checkpoint is a write that costs RUs on the lease container.

The problem: In high-throughput workloads, the lease container can consume significant RUs just for checkpoint writes. Customers asked for a way to checkpoint less often while still bounding the replay window to a predictable duration.

Copilot AI review requested due to automatic review settings April 11, 2026 18:25
@dibahlfi dibahlfi requested review from a team and kirankumarkolli as code owners April 11, 2026 18:25
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a configurable checkpointing strategy for the Cosmos Change Feed Processor (CFP) to reduce lease-container RU consumption by checkpointing less frequently while aiming to bound the replay window by time.

Changes:

  • Adds public checkpoint strategy types (EveryBatchCheckpointStrategy, TimeIntervalCheckpointStrategy) and wires them into ChangeFeedProcessorOptions.
  • Introduces an internal CheckpointFrequencyFactory to map public strategy choices to existing internal CheckpointFrequency.
  • Updates AutoCheckpointer to support time-interval-based checkpointing via a background timer, with new unit tests covering interval behavior and concurrency edge cases.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TimeIntervalCheckpointStrategy.java New public strategy type for time-bounded checkpointing.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/EveryBatchCheckpointStrategy.java New public strategy type representing “checkpoint every batch”.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedCheckpointStrategy.java New public base type for checkpoint strategies.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java Adds checkpoint strategy option (setter/getter).
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessorBuilder.java Validates time-interval strategy against lease expiration interval.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/CheckpointFrequencyFactory.java Maps public strategies to internal CheckpointFrequency.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/AutoCheckpointer.java Implements interval-based background checkpointing and single-flight checkpointing.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java Uses CheckpointFrequencyFactory instead of default CheckpointFrequency.
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java Uses CheckpointFrequencyFactory instead of default CheckpointFrequency.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/common/AutoCheckpointerTests.java New unit tests for interval checkpointing behavior.
sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ChangeFeedCheckpointStrategyTests.java New unit tests for public strategy/options validation.

Comment on lines +113 to +120
private void startIntervalCheckpointing() {
Duration interval = this.checkpointFrequency.getTimeInterval();
if (interval == null) {
return;
}

this.intervalCheckpointDisposable = Flux.interval(interval, interval)
.concatMap(ignored -> this.checkpointIfIntervalElapsed()
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interval checkpointing uses Flux.interval(interval, interval) without specifying a Scheduler, so it will run on Reactor’s default timer/parallel scheduler rather than the ChangeFeedProcessorOptions scheduler. This can surprise users who set a custom scheduler and also creates one periodic task per lease even when there is no uncheckpointed progress. Consider injecting/propagating the CFP scheduler into AutoCheckpointer and using it for the interval, and optionally starting/stopping the timer only while hasUncheckpointedProgress is true to reduce background overhead.

Copilot uses AI. Check for mistakes.
Comment on lines 62 to 66
@Override
public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) {
this.stopIntervalCheckpointing();
this.observer.close(context, reason);
}
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close(...) disposes the interval stream, but an interval tick (or an in-flight checkpointOnce) can race with close and still call context.checkpoint() after the observer has been closed. To avoid checkpointing after shutdown/lease loss, consider adding a closed flag (set in close) and checking it in checkpointIfIntervalElapsed/checkpointOnce, and/or cancelling any in-flight checkpoint subscription on close if that’s safe.

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +83

autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block();
Thread.sleep(150);

Mockito.verify(context, Mockito.atLeastOnce()).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test uses Thread.sleep(...) to wait for Flux.interval ticks, which tends to be flaky and slow in CI. Prefer Reactor virtual time (e.g., VirtualTimeScheduler / StepVerifier.withVirtualTime) so the test can advance time deterministically without sleeping.

Copilot uses AI. Check for mistakes.
Comment on lines +116 to +121

autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block();
Thread.sleep(240);

Mockito.verify(context, Mockito.atLeast(2)).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test relies on Thread.sleep(...) timing to validate retry behavior, which can be flaky under load. Prefer virtual time (Reactor VirtualTimeScheduler / StepVerifier.withVirtualTime) and advance time deterministically instead of sleeping.

Copilot uses AI. Check for mistakes.
Comment on lines +146 to +151
firstCheckpoint.tryEmitValue(Mockito.mock(Lease.class));
Thread.sleep(30);

setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos());
invokeCheckpointIfIntervalElapsed(autoCheckpointer).block();

Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test uses Thread.sleep(...) to wait for async completion/interval evaluation, which can be flaky. Prefer virtual time and deterministic scheduling; if needed, consider making AutoCheckpointer accept a Scheduler for the interval so tests can use a VirtualTimeScheduler without sleeps.

Copilot uses AI. Check for mistakes.
@github-actions
Copy link
Copy Markdown
Contributor

API Change Check

APIView identified API level changes in this PR and created the following API reviews

com.azure:azure-cosmos

@dibahlfi
Copy link
Copy Markdown
Member Author

@sdkReviewAgent-2

1 similar comment
@dibahlfi
Copy link
Copy Markdown
Member Author

@sdkReviewAgent-2

.then(this.afterProcessChanges(context));
.doOnError(throwable -> logger.warn(
"Unexpected exception from thread: " + Thread.currentThread().getId(), throwable))
.then(this.afterProcessChanges(context, docs));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Concurrency: Eager assembly-time state mutation

.then(this.afterProcessChanges(context, docs));

afterProcessChanges(context, docs) is a plain Java method call — it evaluates eagerly when this line executes, not when the upstream Mono completes. The three imperative side-effects inside it (processedDocCount.addAndGet, latestProgressVersion.incrementAndGet, hasUncheckpointedProgress = true) execute at Mono assembly time, before .then() subscribes to anything.

Why this matters: Today this is safe because DefaultObserver.processChanges() runs the user callback synchronously inside its method body, so processing is done before .then() is reached. But ChangeFeedObserver.processChanges() returns Mono<Void> — its contract supports deferred/lazy processing. If a future observer returns a deferred Mono (e.g., Mono.defer(() -> { /* async work */ })), the side-effects would signal "progress complete" before the observer finishes, and the Flux.interval timer could checkpoint past unprocessed documents.

Suggested fix:

.then(Mono.defer(() -> this.afterProcessChanges(context, docs)));

This makes the side-effects lazy — they execute only after the upstream observer completes, regardless of whether the observer processes synchronously or asynchronously. One-line defensive fix that prevents a latent data-loss window.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

private Mono<Void> afterProcessChanges(ChangeFeedObserverContext<T> context) {
this.processedDocCount.incrementAndGet();
private Mono<Void> afterProcessChanges(ChangeFeedObserverContext<T> context, List<T> docs) {
this.processedDocCount.addAndGet(docs.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Behavioral Change: processedDocCount now counts documents, not batches

this.processedDocCount.addAndGet(docs.size());

The old code was this.processedDocCount.incrementAndGet() — it counted batches. The new code counts individual documents. This changes the semantics of the processedDocumentCount threshold in CheckpointFrequency.

Why this matters: If CheckpointFrequency.withProcessedDocumentCount(10) is set, the old behavior would checkpoint every 10 batches; the new behavior checkpoints every 10 documents. The new behavior is arguably more correct (the field is named "processedDocumentCount"), but it's a silent semantic change. Currently the threshold path is dormant (no public API sets processedDocumentCount > 0), so there's no user-visible regression — but if this path is ever activated, the behavior will differ from what was there before.

Consider adding a brief code comment noting this was an intentional correction (documents, not batches) so future readers understand the choice.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

}

@Override
public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Design: No best-effort final checkpoint on close

this.stopIntervalCheckpointing();

With time-interval checkpointing, close() stops the timer but never attempts a final checkpoint for any uncheckpointed progress. On clean shutdown or lease rebalancing, up to one full maxCheckpointDelay interval's worth of documents will be re-processed by the next lease owner.

Why this matters: For a 30-second checkpoint interval with high throughput, this could mean thousands of documents unnecessarily re-processed — consuming RUs and processing time. The old every-batch strategy never had this problem. While the PR's design intentionally accepts some replay (bounded by maxCheckpointDelay), a best-effort final checkpoint on SHUTDOWN would minimize unnecessary reprocessing during orderly transitions (lease rebalancing, graceful shutdown).

Possible enhancement (not blocking):

public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) {
    this.stopIntervalCheckpointing();
    if (this.hasUncheckpointedProgress && reason == ChangeFeedObserverCloseReason.SHUTDOWN) {
        ChangeFeedObserverContext<T> ctx = this.latestContext.get();
        if (ctx != null) {
            try { this.checkpointOnce(ctx).block(Duration.ofSeconds(5)); }
            catch (Exception e) { logger.warn("Best-effort final checkpoint failed", e); }
        }
    }
    this.observer.close(context, reason);
}

This is a separate concern from the existing comment about the close/checkpoint race — this is about proactively flushing pending work, not about an in-flight checkpoint racing with close.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

/**
* Checkpoint strategy that writes a checkpoint after each processed batch.
*/
public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — API Design: EveryBatchCheckpointStrategy is semantically identical to null

public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {

This public class has zero fields and zero methods. null already means "every batch" — CheckpointFrequencyFactory.fromOptions() treats both identically (returning new CheckpointFrequency()), and the getCheckpointStrategy() Javadoc says "returns null when default every-batch checkpointing is used."

Why this matters:

  1. Dual code paths: Every consumer dispatching on strategy type (the factory, the builder validation) must handle both null AND EveryBatchCheckpointStrategy for the same behavior.
  2. User confusion: Which should customers use — null (the default) or new EveryBatchCheckpointStrategy()?
  3. Precedent mismatch: The sibling AvailabilityStrategy hierarchy has only one subclass (ThresholdBasedAvailabilityStrategy) — no "default/no-op" marker class.
  4. API surface cost: Public final class is committed API. Removing it later is a breaking change.

Suggestion: Consider removing this class. null already conveys "use the default (every-batch)." If an explicit sentinel is needed for telemetry or logging, a static factory method like ChangeFeedCheckpointStrategy.everyBatch() could return a package-private marker without inflating the public API.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

private CheckpointFrequencyFactory() {
}

public static CheckpointFrequency fromOptions(ChangeFeedProcessorOptions options) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Testing: CheckpointFrequencyFactory has zero unit tests

public static CheckpointFrequency fromOptions(ChangeFeedProcessorOptions options) {

This factory has 5 distinct code paths (null options, null strategy, EveryBatchCheckpointStrategy, TimeIntervalCheckpointStrategy, unknown subclass → exception) and none are directly tested. It's the sole bridge between the public API and the internal checkpointing engine — a mapping error here would silently misconfigure checkpointing for all users.

Why this matters: The factory is tested indirectly through AutoCheckpointerTests, but those tests construct CheckpointFrequency directly and bypass the factory entirely. No test verifies that new TimeIntervalCheckpointStrategy(Duration.ofSeconds(5))CheckpointFrequencyFactory.fromOptions(options)CheckpointFrequency with timeInterval == 5s. A regression in this mapping would break the entire feature with no test catching it.

Suggestion: Add a CheckpointFrequencyFactoryTests class covering:

  1. fromOptions(null) → default frequency (every batch)
  2. fromOptions(options with null strategy) → default frequency
  3. fromOptions(options with EveryBatchCheckpointStrategy) → default frequency
  4. fromOptions(options with TimeIntervalCheckpointStrategy(30s)) → frequency with timeInterval == 30s
  5. Unknown strategy type → IllegalArgumentException

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.


@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void timeIntervalCheckpointStrategyUsesBackgroundTimer() throws InterruptedException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Testing: Missing "no progress = no checkpoint" test

The timeIntervalCheckpointStrategyUsesBackgroundTimer test always processes a document before waiting, so hasUncheckpointedProgress is always true. There's no test verifying the guard at line 141 (!this.hasUncheckpointedProgress) — the core mechanism that prevents spurious checkpoint writes when no new batches have arrived since the last checkpoint.

Why this matters: The entire value proposition of this PR is reducing RU cost by checkpointing less often. If the hasUncheckpointedProgress guard is accidentally removed, every interval tick would write a redundant checkpoint, defeating the purpose. A test that verifies "timer fires but no checkpoint called when no docs processed" would protect this invariant.

Suggested test:

@Test(groups = {"unit"})
public void intervalTimerDoesNotCheckpointWithoutProgress() throws Exception {
    // Setup with time interval
    AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
        new CheckpointFrequency().withTimeInterval(Duration.ofMillis(30)), observer);
    autoCheckpointer.open(context);

    // Wait for 2+ interval ticks without processing any changes
    Thread.sleep(100);

    // Checkpoint should never have been called
    Mockito.verify(context, Mockito.never()).checkpoint();
    autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
}

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

* @return the {@link ChangeFeedProcessorOptions}.
*/
public ChangeFeedProcessorOptions setCheckpointStrategy(ChangeFeedCheckpointStrategy checkpointStrategy) {
checkNotNull(checkpointStrategy, "Argument 'checkpointStrategy' can not be null");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — API Ergonomics: setCheckpointStrategy(null) rejected but default is null

checkNotNull(checkpointStrategy, "Argument 'checkpointStrategy' can not be null");

The default value of checkpointStrategy is null (meaning "every batch"), but calling setCheckpointStrategy(null) throws. Once a strategy is set, it cannot be reset to the default without creating a new ChangeFeedProcessorOptions instance. Other Duration setters in this class (e.g., setLeaseRenewInterval, setFeedPollDelay) silently accept null.

Why this matters: Minor ergonomics issue. A user who configures TimeIntervalCheckpointStrategy and wants to revert to default must construct a fresh options object. The workaround is setCheckpointStrategy(new EveryBatchCheckpointStrategy()), which works but relies on a class that is semantically identical to null (see related comment on that class).

Consider either allowing null (to reset to default, consistent with sibling setters) or documenting this as write-once behavior.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

@xinlian12
Copy link
Copy Markdown
Member

Review complete (47:54)

Posted 7 inline comment(s).

Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage

@dibahlfi
Copy link
Copy Markdown
Member Author

@sdkReviewAgent-2

this.lastCheckpointNanoTime = System.nanoTime();

// Keep progress markers if newer batches arrived while this checkpoint was in flight.
if (this.latestProgressVersion.get() == checkpointedProgressVersion) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Concurrency: TOCTOU race in onCheckpointSuccess can silently discard a batch's progress tracking

if (this.latestProgressVersion.get() == checkpointedProgressVersion) {
    this.processedDocCount.set(0);
    this.hasUncheckpointedProgress = false;
}

This check-then-act across three non-atomic fields has a race window. The latestProgressVersion check at line 182 and the subsequent clears at lines 183–184 are not atomic — a concurrent afterProcessChanges call from the processor thread can interleave between them.

Concrete race scenario:

  1. Checkpoint thread (onCheckpointSuccess(V)): reads latestProgressVersion.get() == Vtrue
  2. Processor thread (afterProcessChanges for batch V+1): latestProgressVersion.incrementAndGet() → V+1, processedDocCount.addAndGet(N), hasUncheckpointedProgress = true
  3. Checkpoint thread: processedDocCount.set(0) — overwrites batch V+1's document count
  4. Checkpoint thread: hasUncheckpointedProgress = false — clears batch V+1's dirty flag

Consequence: Batch V+1 was processed by the observer, but its progress tracking is now lost. The background timer won't checkpoint it until the next batch arrives and re-sets the flag. If the process crashes in this window, batch V+1 is re-processed (duplicate processing, not data loss). More importantly, the replay window exceeds the configured maxCheckpointDelay by one batch — undermining the interval guarantee.

This is self-healing on next batch arrival, but the test checkpointSuccessDoesNotClearNewerProgressArrivingDuringInFlightCheckpoint doesn't cover this interleaving — it only tests the case where both batches arrive before the checkpoint completes, so latestProgressVersion is already past the checkpointed version.

Possible fix — use compareAndSet to make the version check and clear atomic with respect to concurrent updates:

private void onCheckpointSuccess(long checkpointedProgressVersion) {
    this.lastCheckpointNanoTime = System.nanoTime();
    if (this.latestProgressVersion.compareAndSet(checkpointedProgressVersion, checkpointedProgressVersion)) {
        // Only clear if version hasn't changed since we checked
        this.processedDocCount.set(0);
        this.hasUncheckpointedProgress = false;
    }
}

Note: compareAndSet(V, V) doesn't change the value but fails if the value was modified by another thread between the checkpoint's version capture and this call — making it a tighter guard against the race.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

@xinlian12
Copy link
Copy Markdown
Member

Review complete (47:48)

Posted 1 inline comment(s).

Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants