-
Notifications
You must be signed in to change notification settings - Fork 2.2k
CFP Time based checkpoint strategy #48780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| import com.azure.cosmos.models.ChangeFeedProcessorOptions; | ||
| import org.mockito.Mockito; | ||
| import org.testng.Assert; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| import java.lang.reflect.InvocationTargetException; | ||
| import java.lang.reflect.Method; | ||
| import java.time.Duration; | ||
|
|
||
| public class ChangeFeedCheckpointStrategyTests { | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| public void optionsDefaultCheckpointStrategyIsNull() { | ||
| ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); | ||
|
|
||
| Assert.assertNull(options.getCheckpointStrategy()); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| public void setCheckpointStrategyNullThrows() { | ||
| ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); | ||
|
|
||
| Assert.assertThrows(NullPointerException.class, () -> options.setCheckpointStrategy(null)); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| public void timeIntervalCheckpointStrategyRequiresPositiveDuration() { | ||
| Assert.assertThrows(NullPointerException.class, () -> new TimeIntervalCheckpointStrategy(null)); | ||
| Assert.assertThrows(IllegalArgumentException.class, () -> new TimeIntervalCheckpointStrategy(Duration.ZERO)); | ||
| Assert.assertThrows(IllegalArgumentException.class, () -> new TimeIntervalCheckpointStrategy(Duration.ofSeconds(-1))); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| public void builderRejectsTimeIntervalCheckpointDelayGreaterThanOrEqualToLeaseExpirationInterval() { | ||
| ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions() | ||
| .setLeaseExpirationInterval(Duration.ofSeconds(30)) | ||
| .setCheckpointStrategy(new TimeIntervalCheckpointStrategy(Duration.ofSeconds(30))); | ||
|
|
||
| ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder() | ||
| .hostName("host-1") | ||
| .feedContainer(Mockito.mock(CosmosAsyncContainer.class)) | ||
| .leaseContainer(Mockito.mock(CosmosAsyncContainer.class)) | ||
| .handleChanges(changes -> { | ||
| }) | ||
| .options(options); | ||
|
|
||
| Assert.assertThrows(IllegalArgumentException.class, builder::buildChangeFeedProcessor); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| public void builderAcceptsTimeIntervalCheckpointDelayLessThanLeaseExpirationInterval() { | ||
| ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions() | ||
| .setLeaseExpirationInterval(Duration.ofSeconds(30)) | ||
| .setCheckpointStrategy(new TimeIntervalCheckpointStrategy(Duration.ofSeconds(10))); | ||
|
|
||
| ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder() | ||
| .hostName("host-1") | ||
| .feedContainer(Mockito.mock(CosmosAsyncContainer.class)) | ||
| .leaseContainer(Mockito.mock(CosmosAsyncContainer.class)) | ||
| .handleChanges(changes -> { | ||
| if (changes == null) { | ||
| throw new IllegalStateException("changes should not be null"); | ||
| } | ||
| }) | ||
| .options(options); | ||
|
|
||
| try { | ||
| Method method = ChangeFeedProcessorBuilder.class.getDeclaredMethod("validateChangeFeedProcessorOptions"); | ||
| method.setAccessible(true); | ||
| method.invoke(builder); | ||
| } catch (InvocationTargetException e) { | ||
| Assert.fail("Expected no exception but got: " + e.getCause(), e.getCause()); | ||
| } catch (Exception e) { | ||
| Assert.fail("Expected no exception but got: " + e.getMessage(), e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos.implementation.changefeed.common; | ||
|
|
||
| import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver; | ||
| import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverCloseReason; | ||
| import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext; | ||
| import com.azure.cosmos.implementation.changefeed.CheckpointFrequency; | ||
| import com.azure.cosmos.implementation.changefeed.Lease; | ||
| import org.mockito.Mockito; | ||
| import org.testng.Assert; | ||
| import org.testng.annotations.Test; | ||
| import reactor.core.Disposable; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.core.publisher.Sinks; | ||
|
|
||
| import java.lang.reflect.Field; | ||
| import java.lang.reflect.Method; | ||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
|
|
||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.Mockito.times; | ||
|
|
||
| public class AutoCheckpointerTests { | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static <T> T getField(Object target, String fieldName) throws Exception { | ||
| Field field = target.getClass().getDeclaredField(fieldName); | ||
| field.setAccessible(true); | ||
| return (T) field.get(target); | ||
| } | ||
|
|
||
| private static void setField(Object target, String fieldName, Object value) throws Exception { | ||
| Field field = target.getClass().getDeclaredField(fieldName); | ||
| field.setAccessible(true); | ||
| field.set(target, value); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static Mono<Void> invokeCheckpointIfIntervalElapsed(AutoCheckpointer<?> autoCheckpointer) throws Exception { | ||
| Method method = AutoCheckpointer.class.getDeclaredMethod("checkpointIfIntervalElapsed"); | ||
| method.setAccessible(true); | ||
| return (Mono<Void>) method.invoke(autoCheckpointer); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void everyBatchCheckpointStrategyCheckpointsAfterEachBatch() { | ||
| ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class); | ||
| ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class); | ||
|
|
||
| Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty()); | ||
| Mockito.when(context.checkpoint()).thenReturn(Mono.empty()); | ||
|
|
||
| AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(new CheckpointFrequency(), observer); | ||
| autoCheckpointer.open(context); | ||
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block(); | ||
|
|
||
| Mockito.verify(context, times(1)).checkpoint(); | ||
| autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void timeIntervalCheckpointStrategyUsesBackgroundTimer() throws InterruptedException { | ||
| ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class); | ||
| ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class); | ||
|
|
||
| Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty()); | ||
| Mockito.when(context.checkpoint()).thenReturn(Mono.empty()); | ||
|
|
||
| CheckpointFrequency checkpointFrequency = new CheckpointFrequency().withTimeInterval(Duration.ofMillis(50)); | ||
| AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>(checkpointFrequency, observer); | ||
| autoCheckpointer.open(context); | ||
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block(); | ||
| Thread.sleep(150); | ||
|
|
||
| Mockito.verify(context, Mockito.atLeastOnce()).checkpoint(); | ||
| autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN); | ||
|
Comment on lines
+78
to
+83
|
||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void openFailureDoesNotStartIntervalTimer() throws Exception { | ||
| ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class); | ||
| ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class); | ||
|
|
||
| Mockito.doThrow(new IllegalStateException("open failed")).when(observer).open(any()); | ||
|
|
||
| AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>( | ||
| new CheckpointFrequency().withTimeInterval(Duration.ofMillis(20)), observer); | ||
|
|
||
| Assert.assertThrows(IllegalStateException.class, () -> autoCheckpointer.open(context)); | ||
|
|
||
| Disposable intervalDisposable = getField(autoCheckpointer, "intervalCheckpointDisposable"); | ||
| Assert.assertNull(intervalDisposable); | ||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void intervalCheckpointFailureDoesNotTerminateIntervalStream() throws InterruptedException { | ||
| ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class); | ||
| ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class); | ||
|
|
||
| Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty()); | ||
| Mockito.when(context.checkpoint()) | ||
| .thenReturn(Mono.error(new IllegalStateException("checkpoint failure")), Mono.empty(), Mono.empty()); | ||
|
|
||
| AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>( | ||
| new CheckpointFrequency().withTimeInterval(Duration.ofMillis(40)), observer); | ||
| autoCheckpointer.open(context); | ||
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("doc")).block(); | ||
| Thread.sleep(240); | ||
|
|
||
| Mockito.verify(context, Mockito.atLeast(2)).checkpoint(); | ||
| autoCheckpointer.close(context, ChangeFeedObserverCloseReason.SHUTDOWN); | ||
|
Comment on lines
+116
to
+121
|
||
| } | ||
|
|
||
| @Test(groups = {"unit"}) | ||
| @SuppressWarnings("unchecked") | ||
| public void checkpointSuccessDoesNotClearNewerProgressArrivingDuringInFlightCheckpoint() throws Exception { | ||
| ChangeFeedObserver<String> observer = Mockito.mock(ChangeFeedObserver.class); | ||
| ChangeFeedObserverContext<String> context = Mockito.mock(ChangeFeedObserverContext.class); | ||
|
|
||
| Mockito.when(observer.processChanges(any(), any())).thenReturn(Mono.empty()); | ||
|
|
||
| Sinks.One<Lease> firstCheckpoint = Sinks.one(); | ||
| Mockito.when(context.checkpoint()) | ||
| .thenReturn(firstCheckpoint.asMono(), Mono.just(Mockito.mock(Lease.class))); | ||
|
|
||
| AutoCheckpointer<String> autoCheckpointer = new AutoCheckpointer<>( | ||
| new CheckpointFrequency().withTimeInterval(Duration.ofSeconds(1)), observer); | ||
|
|
||
| setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos()); | ||
|
|
||
| autoCheckpointer.processChanges(context, Collections.singletonList("batch-1")).subscribe(); | ||
| autoCheckpointer.processChanges(context, Collections.singletonList("batch-2")).subscribe(); | ||
|
|
||
| Mockito.verify(context, times(1)).checkpoint(); | ||
|
|
||
| firstCheckpoint.tryEmitValue(Mockito.mock(Lease.class)); | ||
| Thread.sleep(30); | ||
|
|
||
| setField(autoCheckpointer, "lastCheckpointNanoTime", System.nanoTime() - Duration.ofSeconds(2).toNanos()); | ||
| invokeCheckpointIfIntervalElapsed(autoCheckpointer).block(); | ||
|
|
||
|
Comment on lines
+146
to
+151
|
||
| Mockito.verify(context, times(2)).checkpoint(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| /** | ||
| * Base type for checkpoint strategies used by {@link ChangeFeedProcessor}. | ||
| */ | ||
| public abstract class ChangeFeedCheckpointStrategy { | ||
| ChangeFeedCheckpointStrategy() { | ||
| } | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| /** | ||
| * Checkpoint strategy that writes a checkpoint after each processed batch. | ||
| */ | ||
| public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Recommendation — API Design: public final class EveryBatchCheckpointStrategy extends ChangeFeedCheckpointStrategy {This public class has zero fields and zero methods. Why this matters:
Suggestion: Consider removing this class. |
||
| /** | ||
| * Creates a new every-batch checkpoint strategy. | ||
| */ | ||
| public EveryBatchCheckpointStrategy() { | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // Licensed under the MIT License. | ||
|
|
||
| package com.azure.cosmos; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; | ||
|
|
||
| /** | ||
| * Checkpoint strategy that bounds the replay window by a maximum time interval. | ||
| */ | ||
|
dibahlfi marked this conversation as resolved.
|
||
| public final class TimeIntervalCheckpointStrategy extends ChangeFeedCheckpointStrategy { | ||
| private final Duration maxCheckpointDelay; | ||
|
|
||
| /** | ||
| * Creates a new time-interval checkpoint strategy. | ||
| * | ||
| * @param maxCheckpointDelay the maximum allowed replay window duration. | ||
| */ | ||
| public TimeIntervalCheckpointStrategy(Duration maxCheckpointDelay) { | ||
| checkNotNull(maxCheckpointDelay, "Argument 'maxCheckpointDelay' can not be null"); | ||
| if (maxCheckpointDelay.isZero() || maxCheckpointDelay.isNegative()) { | ||
| throw new IllegalArgumentException("Argument 'maxCheckpointDelay' must be positive"); | ||
| } | ||
|
|
||
| this.maxCheckpointDelay = maxCheckpointDelay; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the maximum replay window. | ||
| * | ||
| * @return the maximum replay window. | ||
| */ | ||
| public Duration getMaxCheckpointDelay() { | ||
| return this.maxCheckpointDelay; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟢 Suggestion — Testing: Missing "no progress = no checkpoint" test
The
timeIntervalCheckpointStrategyUsesBackgroundTimertest always processes a document before waiting, sohasUncheckpointedProgressis alwaystrue. There's no test verifying the guard at line 141 (!this.hasUncheckpointedProgress) — the core mechanism that prevents spurious checkpoint writes when no new batches have arrived since the last checkpoint.Why this matters: The entire value proposition of this PR is reducing RU cost by checkpointing less often. If the
hasUncheckpointedProgressguard is accidentally removed, every interval tick would write a redundant checkpoint, defeating the purpose. A test that verifies "timer fires but no checkpoint called when no docs processed" would protect this invariant.Suggested test: