Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a configurable checkpointing strategy for the Cosmos Change Feed Processor (CFP) to reduce lease-container RU consumption by checkpointing less frequently while aiming to bound the replay window by time.
Changes:
- Adds public checkpoint strategy types (
EveryBatchCheckpointStrategy,TimeIntervalCheckpointStrategy) and wires them intoChangeFeedProcessorOptions. - Introduces an internal
CheckpointFrequencyFactoryto map public strategy choices to existing internalCheckpointFrequency. - Updates
AutoCheckpointerto support time-interval-based checkpointing via a background timer, with new unit tests covering interval behavior and concurrency edge cases.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TimeIntervalCheckpointStrategy.java | New public strategy type for time-bounded checkpointing. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/EveryBatchCheckpointStrategy.java | New public strategy type representing “checkpoint every batch”. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedCheckpointStrategy.java | New public base type for checkpoint strategies. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java | Adds checkpoint strategy option (setter/getter). |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessorBuilder.java | Validates time-interval strategy against lease expiration interval. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/CheckpointFrequencyFactory.java | Maps public strategies to internal CheckpointFrequency. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/AutoCheckpointer.java | Implements interval-based background checkpointing and single-flight checkpointing. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/IncrementalChangeFeedProcessorImpl.java | Uses CheckpointFrequencyFactory instead of default CheckpointFrequency. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java | Uses CheckpointFrequencyFactory instead of default CheckpointFrequency. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/common/AutoCheckpointerTests.java | New unit tests for interval checkpointing behavior. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ChangeFeedCheckpointStrategyTests.java | New unit tests for public strategy/options validation. |
| private void startIntervalCheckpointing() { | ||
| Duration interval = this.checkpointFrequency.getTimeInterval(); | ||
| if (interval == null) { | ||
| return; | ||
| } | ||
|
|
||
| this.intervalCheckpointDisposable = Flux.interval(interval, interval) | ||
| .concatMap(ignored -> this.checkpointIfIntervalElapsed() |
There was a problem hiding this comment.
The interval checkpointing uses Flux.interval(interval, interval) without specifying a Scheduler, so it will run on Reactor’s default timer/parallel scheduler rather than the ChangeFeedProcessorOptions scheduler. This can surprise users who set a custom scheduler and also creates one periodic task per lease even when there is no uncheckpointed progress. Consider injecting/propagating the CFP scheduler into AutoCheckpointer and using it for the interval, and optionally starting/stopping the timer only while hasUncheckpointedProgress is true to reduce background overhead.
| @Override | ||
| public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) { | ||
| this.stopIntervalCheckpointing(); | ||
| this.observer.close(context, reason); | ||
| } |
There was a problem hiding this comment.
close(...) disposes the interval stream, but an interval tick (or an in-flight checkpointOnce) can race with close and still call context.checkpoint() after the observer has been closed. To avoid checkpointing after shutdown/lease loss, consider adding a closed flag (set in close) and checking it in checkpointIfIntervalElapsed/checkpointOnce, and/or cancelling any in-flight checkpoint subscription on close if that’s safe.
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block(); | ||
| Thread.sleep(150); | ||
|
|
||
| Mockito.verify(context, Mockito.atLeastOnce()).checkpoint(); | ||
| autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN); |
There was a problem hiding this comment.
This unit test uses Thread.sleep(...) to wait for Flux.interval ticks, which tends to be flaky and slow in CI. Prefer Reactor virtual time (e.g., VirtualTimeScheduler / StepVerifier.withVirtualTime) so the test can advance time deterministically without sleeping.
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block(); | ||
| Thread.sleep(240); | ||
|
|
||
| Mockito.verify(context, Mockito.atLeast(2)).checkpoint(); | ||
| autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN); |
There was a problem hiding this comment.
This unit test relies on Thread.sleep(...) timing to validate retry behavior, which can be flaky under load. Prefer virtual time (Reactor VirtualTimeScheduler / StepVerifier.withVirtualTime) and advance time deterministically instead of sleeping.
| firstCheckpoint.tryEmitValue(Mockito.mock(Lease.class)); | ||
| Thread.sleep(30); | ||
|
|
||
| setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos()); | ||
| invokeCheckpointIfIntervalElapsed(autoCheckpointer).block(); | ||
|
|
There was a problem hiding this comment.
This unit test uses Thread.sleep(...) to wait for async completion/interval evaluation, which can be flaky. Prefer virtual time and deterministic scheduling; if needed, consider making AutoCheckpointer accept a Scheduler for the interval so tests can use a VirtualTimeScheduler without sleeps.
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
|
@sdkReviewAgent-2 |
1 similar comment
|
@sdkReviewAgent-2 |
| .then(this.afterProcessChanges(context)); | ||
| .doOnError(throwable -> logger.warn( | ||
| "Unexpected exception from thread: " + Thread.currentThread().getId(), throwable)) | ||
| .then(this.afterProcessChanges(context, docs)); |
There was a problem hiding this comment.
🟡 Recommendation — Concurrency: Eager assembly-time state mutation
.then(this.afterProcessChanges(context, docs));afterProcessChanges(context, docs) is a plain Java method call — it evaluates eagerly when this line executes, not when the upstream Mono completes. The three imperative side-effects inside it (processedDocCount.addAndGet, latestProgressVersion.incrementAndGet, hasUncheckpointedProgress = true) execute at Mono assembly time, before .then() subscribes to anything.
Why this matters: Today this is safe because DefaultObserver.processChanges() runs the user callback synchronously inside its method body, so processing is done before .then() is reached. But ChangeFeedObserver.processChanges() returns Mono<Void> — its contract supports deferred/lazy processing. If a future observer returns a deferred Mono (e.g., Mono.defer(() -> { /* async work */ })), the side-effects would signal "progress complete" before the observer finishes, and the Flux.interval timer could checkpoint past unprocessed documents.
Suggested fix:
.then(Mono.defer(() -> this.afterProcessChanges(context, docs)));This makes the side-effects lazy — they execute only after the upstream observer completes, regardless of whether the observer processes synchronously or asynchronously. One-line defensive fix that prevents a latent data-loss window.
| private Mono<Void> afterProcessChanges(ChangeFeedObserverContext<T> context) { | ||
| this.processedDocCount.incrementAndGet(); | ||
| private Mono<Void> afterProcessChanges(ChangeFeedObserverContext<T> context, List<T> docs) { | ||
| this.processedDocCount.addAndGet(docs.size()); |
There was a problem hiding this comment.
🟡 Recommendation — Behavioral Change: processedDocCount now counts documents, not batches
this.processedDocCount.addAndGet(docs.size());The old code was this.processedDocCount.incrementAndGet() — it counted batches. The new code counts individual documents. This changes the semantics of the processedDocumentCount threshold in CheckpointFrequency.
Why this matters: If CheckpointFrequency.withProcessedDocumentCount(10) is set, the old behavior would checkpoint every 10 batches; the new behavior checkpoints every 10 documents. The new behavior is arguably more correct (the field is named "processedDocumentCount"), but it's a silent semantic change. Currently the threshold path is dormant (no public API sets processedDocumentCount > 0), so there's no user-visible regression — but if this path is ever activated, the behavior will differ from what was there before.
Consider adding a brief code comment noting this was an intentional correction (documents, not batches) so future readers understand the choice.
| } | ||
|
|
||
| @Override | ||
| public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) { |
There was a problem hiding this comment.
🟢 Suggestion — Design: No best-effort final checkpoint on close
this.stopIntervalCheckpointing();With time-interval checkpointing, close() stops the timer but never attempts a final checkpoint for any uncheckpointed progress. On clean shutdown or lease rebalancing, up to one full maxCheckpointDelay interval's worth of documents will be re-processed by the next lease owner.
Why this matters: For a 30-second checkpoint interval with high throughput, this could mean thousands of documents unnecessarily re-processed — consuming RUs and processing time. The old every-batch strategy never had this problem. While the PR's design intentionally accepts some replay (bounded by maxCheckpointDelay), a best-effort final checkpoint on SHUTDOWN would minimize unnecessary reprocessing during orderly transitions (lease rebalancing, graceful shutdown).
Possible enhancement (not blocking):
public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseReason reason) {
this.stopIntervalCheckpointing();
if (this.hasUncheckpointedProgress && reason == ChangeFeedObserverCloseReason.SHUTDOWN) {
ChangeFeedObserverContext<T> ctx = this.latestContext.get();
if (ctx != null) {
try { this.checkpointOnce(ctx).block(Duration.ofSeconds(5)); }
catch (Exception e) { logger.warn("Best-effort final checkpoint failed", e); }
}
}
this.observer.close(context, reason);
}This is a separate concern from the existing comment about the close/checkpoint race — this is about proactively flushing pending work, not about an in-flight checkpoint racing with close.
| /** | ||
| * Checkpoint strategy that writes a checkpoint after each processed batch. | ||
| */ | ||
| public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy { |
There was a problem hiding this comment.
🟡 Recommendation — API Design: EveryBatchCheckpointStrategy is semantically identical to null
public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {This public class has zero fields and zero methods. null already means "every batch" — CheckpointFrequencyFactory.fromOptions() treats both identically (returning new CheckpointFrequency()), and the getCheckpointStrategy() Javadoc says "returns null when default every-batch checkpointing is used."
Why this matters:
- Dual code paths: Every consumer dispatching on strategy type (the factory, the builder validation) must handle both
nullANDEveryBatchCheckpointStrategyfor the same behavior. - User confusion: Which should customers use —
null(the default) ornew EveryBatchCheckpointStrategy()? - Precedent mismatch: The sibling
AvailabilityStrategyhierarchy has only one subclass (ThresholdBasedAvailabilityStrategy) — no "default/no-op" marker class. - API surface cost: Public
finalclass is committed API. Removing it later is a breaking change.
Suggestion: Consider removing this class. null already conveys "use the default (every-batch)." If an explicit sentinel is needed for telemetry or logging, a static factory method like ChangeFeedCheckpointStrategy.everyBatch() could return a package-private marker without inflating the public API.
| private CheckpointFrequencyFactory() { | ||
| } | ||
|
|
||
| public static CheckpointFrequency fromOptions(ChangeFeedProcessorOptions options) { |
There was a problem hiding this comment.
🟢 Suggestion — Testing: CheckpointFrequencyFactory has zero unit tests
public static CheckpointFrequency fromOptions(ChangeFeedProcessorOptions options) {This factory has 5 distinct code paths (null options, null strategy, EveryBatchCheckpointStrategy, TimeIntervalCheckpointStrategy, unknown subclass → exception) and none are directly tested. It's the sole bridge between the public API and the internal checkpointing engine — a mapping error here would silently misconfigure checkpointing for all users.
Why this matters: The factory is tested indirectly through AutoCheckpointerTests, but those tests construct CheckpointFrequency directly and bypass the factory entirely. No test verifies that new TimeIntervalCheckpointStrategy(Duration.ofSeconds(5)) → CheckpointFrequencyFactory.fromOptions(options) → CheckpointFrequency with timeInterval == 5s. A regression in this mapping would break the entire feature with no test catching it.
Suggestion: Add a CheckpointFrequencyFactoryTests class covering:
fromOptions(null)→ default frequency (every batch)fromOptions(options with null strategy)→ default frequencyfromOptions(options with EveryBatchCheckpointStrategy)→ default frequencyfromOptions(options with TimeIntervalCheckpointStrategy(30s))→ frequency withtimeInterval == 30s- Unknown strategy type →
IllegalArgumentException
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void timeIntervalCheckpointStrategyUsesBackgroundTimer() throws InterruptedException { |
There was a problem hiding this comment.
🟢 Suggestion — Testing: Missing "no progress = no checkpoint" test
The timeIntervalCheckpointStrategyUsesBackgroundTimer test always processes a document before waiting, so hasUncheckpointedProgress is always true. There's no test verifying the guard at line 141 (!this.hasUncheckpointedProgress) — the core mechanism that prevents spurious checkpoint writes when no new batches have arrived since the last checkpoint.
Why this matters: The entire value proposition of this PR is reducing RU cost by checkpointing less often. If the hasUncheckpointedProgress guard is accidentally removed, every interval tick would write a redundant checkpoint, defeating the purpose. A test that verifies "timer fires but no checkpoint called when no docs processed" would protect this invariant.
Suggested test:
@Test(groups = {"unit"})
public void intervalTimerDoesNotCheckpointWithoutProgress() throws Exception {
// Setup with time interval
AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
new CheckpointFrequency().withTimeInterval(Duration.ofMillis(30)), observer);
autoCheckpointer.open(context);
// Wait for 2+ interval ticks without processing any changes
Thread.sleep(100);
// Checkpoint should never have been called
Mockito.verify(context, Mockito.never()).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
}| * @return the {@link ChangeFeedProcessorOptions}. | ||
| */ | ||
| public ChangeFeedProcessorOptions setCheckpointStrategy(ChangeFeedCheckpointStrategy checkpointStrategy) { | ||
| checkNotNull(checkpointStrategy, "Argument 'checkpointStrategy' can not be null"); |
There was a problem hiding this comment.
🟢 Suggestion — API Ergonomics: setCheckpointStrategy(null) rejected but default is null
checkNotNull(checkpointStrategy, "Argument 'checkpointStrategy' can not be null");The default value of checkpointStrategy is null (meaning "every batch"), but calling setCheckpointStrategy(null) throws. Once a strategy is set, it cannot be reset to the default without creating a new ChangeFeedProcessorOptions instance. Other Duration setters in this class (e.g., setLeaseRenewInterval, setFeedPollDelay) silently accept null.
Why this matters: Minor ergonomics issue. A user who configures TimeIntervalCheckpointStrategy and wants to revert to default must construct a fresh options object. The workaround is setCheckpointStrategy(new EveryBatchCheckpointStrategy()), which works but relies on a class that is semantically identical to null (see related comment on that class).
Consider either allowing null (to reset to default, consistent with sibling setters) or documenting this as write-once behavior.
|
✅ Review complete (47:54) Posted 7 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
|
@sdkReviewAgent-2 |
| this.lastCheckpointNanoTime = System.nanoTime(); | ||
|
|
||
| // Keep progress markers if newer batches arrived while this checkpoint was in flight. | ||
| if (this.latestProgressVersion.get() == checkpointedProgressVersion) { |
There was a problem hiding this comment.
🟡 Recommendation — Concurrency: TOCTOU race in onCheckpointSuccess can silently discard a batch's progress tracking
if (this.latestProgressVersion.get() == checkpointedProgressVersion) {
this.processedDocCount.set(0);
this.hasUncheckpointedProgress = false;
}This check-then-act across three non-atomic fields has a race window. The latestProgressVersion check at line 182 and the subsequent clears at lines 183–184 are not atomic — a concurrent afterProcessChanges call from the processor thread can interleave between them.
Concrete race scenario:
- Checkpoint thread (
onCheckpointSuccess(V)): readslatestProgressVersion.get() == V→true - Processor thread (
afterProcessChangesfor batch V+1):latestProgressVersion.incrementAndGet()→ V+1,processedDocCount.addAndGet(N),hasUncheckpointedProgress = true - Checkpoint thread:
processedDocCount.set(0)— overwrites batch V+1's document count - Checkpoint thread:
hasUncheckpointedProgress = false— clears batch V+1's dirty flag
Consequence: Batch V+1 was processed by the observer, but its progress tracking is now lost. The background timer won't checkpoint it until the next batch arrives and re-sets the flag. If the process crashes in this window, batch V+1 is re-processed (duplicate processing, not data loss). More importantly, the replay window exceeds the configured maxCheckpointDelay by one batch — undermining the interval guarantee.
This is self-healing on next batch arrival, but the test checkpointSuccessDoesNotClearNewerProgressArrivingDuringInFlightCheckpoint doesn't cover this interleaving — it only tests the case where both batches arrive before the checkpoint completes, so latestProgressVersion is already past the checkpointed version.
Possible fix — use compareAndSet to make the version check and clear atomic with respect to concurrent updates:
private void onCheckpointSuccess(long checkpointedProgressVersion) {
this.lastCheckpointNanoTime = System.nanoTime();
if (this.latestProgressVersion.compareAndSet(checkpointedProgressVersion, checkpointedProgressVersion)) {
// Only clear if version hasn't changed since we checked
this.processedDocCount.set(0);
this.hasUncheckpointedProgress = false;
}
}Note: compareAndSet(V, V) doesn't change the value but fails if the value was modified by another thread between the checkpoint's version capture and this call — making it a tighter guard against the race.
|
✅ Review complete (47:48) Posted 1 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
By default, CFP writes a checkpoint after every single batch. This is the safest behavior — the replay window is at most one batch — but every checkpoint is a write that costs RUs on the lease container.
The problem: In high-throughput workloads, the lease container can consume significant RUs just for checkpoint writes. Customers asked for a way to checkpoint less often while still bounding the replay window to a predictable duration.