Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;

public class ChangeFeedCheckpointStrategyTests {

@Test(groups = {"unit"})
public void optionsDefaultCheckpointStrategyIsNull() {
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();

Assert.assertNull(options.getCheckpointStrategy());
}

@Test(groups = {"unit"})
public void setCheckpointStrategyNullThrows() {
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();

Assert.assertThrows(NullPointerException.class, () -> options.setCheckpointStrategy(null));
}

@Test(groups = {"unit"})
public void timeIntervalCheckpointStrategyRequiresPositiveDuration() {
Assert.assertThrows(NullPointerException.class, () -> new TimeIntervalCheckpointStrategy(null));
Assert.assertThrows(IllegalArgumentException.class, () -> new TimeIntervalCheckpointStrategy(Duration.ZERO));
Assert.assertThrows(IllegalArgumentException.class, () -> new TimeIntervalCheckpointStrategy(Duration.ofSeconds(-1)));
}

@Test(groups = {"unit"})
public void builderRejectsTimeIntervalCheckpointDelayGreaterThanOrEqualToLeaseExpirationInterval() {
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions()
.setLeaseExpirationInterval(Duration.ofSeconds(30))
.setCheckpointStrategy(new TimeIntervalCheckpointStrategy(Duration.ofSeconds(30)));

ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder()
.hostName("host-1")
.feedContainer(Mockito.mock(CosmosAsyncContainer.class))
.leaseContainer(Mockito.mock(CosmosAsyncContainer.class))
.handleChanges(changes -> {
})
.options(options);

Assert.assertThrows(IllegalArgumentException.class, builder::buildChangeFeedProcessor);
}

@Test(groups = {"unit"})
public void builderAcceptsTimeIntervalCheckpointDelayLessThanLeaseExpirationInterval() {
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions()
.setLeaseExpirationInterval(Duration.ofSeconds(30))
.setCheckpointStrategy(new TimeIntervalCheckpointStrategy(Duration.ofSeconds(10)));

ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder()
.hostName("host-1")
.feedContainer(Mockito.mock(CosmosAsyncContainer.class))
.leaseContainer(Mockito.mock(CosmosAsyncContainer.class))
.handleChanges(changes -> {
if (changes == null) {
throw new IllegalStateException("changes should not be null");
}
})
.options(options);

try {
Method method = ChangeFeedProcessorBuilder.class.getDeclaredMethod("validateChangeFeedProcessorOptions");
method.setAccessible(true);
method.invoke(builder);
} catch (InvocationTargetException e) {
Assert.fail("Expected no exception but got: " + e.getCause(), e.getCause());
} catch (Exception e) {
Assert.fail("Expected no exception but got: " + e.getMessage(), e);
}
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.changefeed.common;

import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverCloseReason;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;
import com.azure.cosmos.implementation.changefeed.Lease;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;

public class AutoCheckpointerTests {

@SuppressWarnings("unchecked")
private static <T> T getField(Object target, String fieldName) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return (T) field.get(target);
}

private static void setField(Object target, String fieldName, Object value) throws Exception {
Field field = target.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(target, value);
}

@SuppressWarnings("unchecked")
private static Mono<Void> invokeCheckpointIfIntervalElapsed(AutoCheckpointer<?> autoCheckpointer) throws Exception {
Method method = AutoCheckpointer.class.getDeclaredMethod("checkpointIfIntervalElapsed");
method.setAccessible(true);
return (Mono<Void>) method.invoke(autoCheckpointer);
}

@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void everyBatchCheckpointStrategyCheckpointsAfterEachBatch() {
ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class);

Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty());
Mockito.when(context.checkpoint()).thenReturn(Mono.empty());

AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(new CheckpointFrequency(), observer);
autoCheckpointer.open(context);

autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block();

Mockito.verify(context, times(1)).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
}

@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void timeIntervalCheckpointStrategyUsesBackgroundTimer() throws InterruptedException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Testing: Missing "no progress = no checkpoint" test

The timeIntervalCheckpointStrategyUsesBackgroundTimer test always processes a document before waiting, so hasUncheckpointedProgress is always true. There's no test verifying the guard at line 141 (!this.hasUncheckpointedProgress) — the core mechanism that prevents spurious checkpoint writes when no new batches have arrived since the last checkpoint.

Why this matters: The entire value proposition of this PR is reducing RU cost by checkpointing less often. If the hasUncheckpointedProgress guard is accidentally removed, every interval tick would write a redundant checkpoint, defeating the purpose. A test that verifies "timer fires but no checkpoint called when no docs processed" would protect this invariant.

Suggested test:

@Test(groups = {"unit"})
public void intervalTimerDoesNotCheckpointWithoutProgress() throws Exception {
    // Setup with time interval
    AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
        new CheckpointFrequency().withTimeInterval(Duration.ofMillis(30)), observer);
    autoCheckpointer.open(context);

    // Wait for 2+ interval ticks without processing any changes
    Thread.sleep(100);

    // Checkpoint should never have been called
    Mockito.verify(context, Mockito.never()).checkpoint();
    autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
}

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class);

Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty());
Mockito.when(context.checkpoint()).thenReturn(Mono.empty());

CheckpointFrequency checkpointFrequency = new CheckpointFrequency().withTimeInterval(Duration.ofMillis(50));
AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(checkpointFrequency, observer);
autoCheckpointer.open(context);

autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block();
Thread.sleep(150);

Mockito.verify(context, Mockito.atLeastOnce()).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
Comment on lines +78 to +83
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test uses Thread.sleep(...) to wait for Flux.interval ticks, which tends to be flaky and slow in CI. Prefer Reactor virtual time (e.g., VirtualTimeScheduler / StepVerifier.withVirtualTime) so the test can advance time deterministically without sleeping.

Copilot uses AI. Check for mistakes.
}

@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void openFailureDoesNotStartIntervalTimer() throws Exception {
ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class);

Mockito.doThrow(new IllegalStateException("open failed")).when(observer).open(any());

AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
new CheckpointFrequency().withTimeInterval(Duration.ofMillis(20)), observer);

Assert.assertThrows(IllegalStateException.class, () -> autoCheckpointer.open(context));

Disposable intervalDisposable = getField(autoCheckpointer, "intervalCheckpointDisposable");
Assert.assertNull(intervalDisposable);
}

@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void intervalCheckpointFailureDoesNotTerminateIntervalStream() throws InterruptedException {
ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class);

Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty());
Mockito.when(context.checkpoint())
.thenReturn(Mono.error(new IllegalStateException("checkpoint failure")), Mono.empty(), Mono.empty());

AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
new CheckpointFrequency().withTimeInterval(Duration.ofMillis(40)), observer);
autoCheckpointer.open(context);

autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block();
Thread.sleep(240);

Mockito.verify(context, Mockito.atLeast(2)).checkpoint();
autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN);
Comment on lines +116 to +121
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test relies on Thread.sleep(...) timing to validate retry behavior, which can be flaky under load. Prefer virtual time (Reactor VirtualTimeScheduler / StepVerifier.withVirtualTime) and advance time deterministically instead of sleeping.

Copilot uses AI. Check for mistakes.
}

@Test(groups = {"unit"})
@SuppressWarnings("unchecked")
public void checkpointSuccessDoesNotClearNewerProgressArrivingDuringInFlightCheckpoint() throws Exception {
ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class);
ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class);

Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty());

Sinks.One<Lease> firstCheckpoint = Sinks.one();
Mockito.when(context.checkpoint())
.thenReturn(firstCheckpoint.asMono(), Mono.just(Mockito.mock(Lease.class)));

AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(
new CheckpointFrequency().withTimeInterval(Duration.ofSeconds(1)), observer);

setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos());

autoCheckpointer.processChanges(context, Collections.singletonList("batch-1")).subscribe();
autoCheckpointer.processChanges(context, Collections.singletonList("batch-2")).subscribe();

Mockito.verify(context, times(1)).checkpoint();

firstCheckpoint.tryEmitValue(Mockito.mock(Lease.class));
Thread.sleep(30);

setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos());
invokeCheckpointIfIntervalElapsed(autoCheckpointer).block();

Comment on lines +146 to +151
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unit test uses Thread.sleep(...) to wait for async completion/interval evaluation, which can be flaky. Prefer virtual time and deterministic scheduling; if needed, consider making AutoCheckpointer accept a Scheduler for the interval so tests can use a VirtualTimeScheduler without sleeps.

Copilot uses AI. Check for mistakes.
Mockito.verify(context, times(2)).checkpoint();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

/**
* Base type for checkpoint strategies used by {@link ChangeFeedProcessor}.
*/
public abstract class ChangeFeedCheckpointStrategy {
ChangeFeedCheckpointStrategy() {
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,16 @@ private void validateChangeFeedProcessorOptions() {
// force a lot of resets and lead to a poor overall performance of ChangeFeedProcessor.
throw new IllegalArgumentException("changeFeedProcessorOptions: expecting leaseRenewInterval less than leaseExpirationInterval");
}

ChangeFeedCheckpointStrategy checkpointStrategy = this.changeFeedProcessorOptions.getCheckpointStrategy();
if (checkpointStrategy instanceof TimeIntervalCheckpointStrategy) {
if (((TimeIntervalCheckpointStrategy)checkpointStrategy).getMaxCheckpointDelay()
Comment thread
dibahlfi marked this conversation as resolved.
.compareTo(this.changeFeedProcessorOptions.getLeaseExpirationInterval()) >= 0) {
throw new IllegalArgumentException(
"changeFeedProcessorOptions: expecting checkpointStrategy.maxCheckpointDelay less than leaseExpirationInterval");
}
}

// Some extra checks for all versions and deletes mode
if (ChangeFeedMode.FULL_FIDELITY.equals(changeFeedMode)) {
if (this.changeFeedProcessorOptions.getStartTime() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

/**
* Checkpoint strategy that writes a checkpoint after each processed batch.
*/
public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — API Design: EveryBatchCheckpointStrategy is semantically identical to null

public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {

This public class has zero fields and zero methods. null already means "every batch" — CheckpointFrequencyFactory.fromOptions() treats both identically (returning new CheckpointFrequency()), and the getCheckpointStrategy() Javadoc says "returns null when default every-batch checkpointing is used."

Why this matters:

  1. Dual code paths: Every consumer dispatching on strategy type (the factory, the builder validation) must handle both null AND EveryBatchCheckpointStrategy for the same behavior.
  2. User confusion: Which should customers use — null (the default) or new EveryBatchCheckpointStrategy()?
  3. Precedent mismatch: The sibling AvailabilityStrategy hierarchy has only one subclass (ThresholdBasedAvailabilityStrategy) — no "default/no-op" marker class.
  4. API surface cost: Public final class is committed API. Removing it later is a breaking change.

Suggestion: Consider removing this class. null already conveys "use the default (every-batch)." If an explicit sentinel is needed for telemetry or logging, a static factory method like ChangeFeedCheckpointStrategy.everyBatch() could return a package-private marker without inflating the public API.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

/**
* Creates a new every-batch checkpoint strategy.
*/
public EveryBatchCheckpointStrategy() {
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos;

import java.time.Duration;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
* Checkpoint strategy that bounds the replay window by a maximum time interval.
*/
Comment thread
dibahlfi marked this conversation as resolved.
public final class TimeIntervalCheckpointStrategy extends ChangeFeedCheckpointStrategy {
private final Duration maxCheckpointDelay;

/**
* Creates a new time-interval checkpoint strategy.
*
* @param maxCheckpointDelay the maximum allowed replay window duration.
*/
public TimeIntervalCheckpointStrategy(Duration maxCheckpointDelay) {
checkNotNull(maxCheckpointDelay, "Argument 'maxCheckpointDelay' can not be null");
if (maxCheckpointDelay.isZero() || maxCheckpointDelay.isNegative()) {
throw new IllegalArgumentException("Argument 'maxCheckpointDelay' must be positive");
}

this.maxCheckpointDelay = maxCheckpointDelay;
}

/**
* Gets the maximum replay window.
*
* @return the maximum replay window.
*/
public Duration getMaxCheckpointDelay() {
return this.maxCheckpointDelay;
}
}

Loading