diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md index 5045991f3542..7e2ef0f18fb4 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md index 777177708547..48c15798f4ae 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md index 1360614308a3..85d6f36f2b1b 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md index 6674b6f8bb74..e7561d05958e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala index 5d83a5139ef2..f35b580a0fa7 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala @@ -214,6 +214,12 @@ private case class ChangeFeedPartitionReader .setEndLSN(options, this.partition.endLsn.get) } + // Bubble empty pages up to the iterator so the per-page end-to-end timeout + // applies to each individual page rather than being exceeded by serial + // empty-page drains inside ChangeFeedFetcher. + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor + .setAllowNotModifiedPages(options, true) + options.setCustomItemSerializer(itemDeserializer) } diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala index 44027bafbe7e..7739485bbc91 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala @@ -45,6 +45,14 @@ private case class ItemsPartitionReader .getCosmosQueryRequestOptionsAccessor .disallowQueryPlanRetrieval(new CosmosQueryRequestOptions()) + // Bubble empty pages up to the iterator so the per-page end-to-end timeout + // applies to each individual page rather than being exceeded by serial + // empty-page drains inside ParallelDocumentQueryExecutionContext. + ImplementationBridgeHelpers + .CosmosQueryRequestOptionsHelper + .getCosmosQueryRequestOptionsAccessor + .setAllowEmptyPages(queryOptions, true) + private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config) ThroughputControlHelper.populateThroughputControlGroupName( ImplementationBridgeHelpers diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala index b8400fdd3eff..9185e7529d35 100644 --- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala @@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr factoryCallCount.get shouldEqual 1 } + "TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in { + // Regression test for the empty-page drain scenario: when the SDK is configured with + // emptyPagesAllowed=true the iterator must surface many consecutive empty + // pages without busy-waiting beyond the per-page end-to-end timeout. Even + // with hundreds of empty pages followed by real data, the iterator should + // return all real rows. + val emptyLeadingPages = 200 + val realPages = 5 + val totalPages = emptyLeadingPages + realPages + val iterator = new TransientIOErrorsRetryingIterator( + continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix( + continuationToken, totalPages, emptyLeadingPages), + pageSize, + 1, + None, + None + ) + iterator.maxRetryIntervalInMs = 5 + + // 2 producers (Left/Right) each emit realPages * pageSize rows + iterator.count(_ => true) shouldEqual (realPages * pageSize * 2) + } + + private def generateMockedCosmosPagedFluxWithEmptyPrefix + ( + continuationToken: String, + initialPageCount: Int, + leadingEmptyPageCount: Int + ) = { + + val leftProducer = generateFeedResponseFluxWithEmptyPrefix( + "Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken)) + val rightProducer = generateFeedResponseFluxWithEmptyPrefix( + "Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken)) + val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava + val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2) + UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux) + } + + private def generateFeedResponseFluxWithEmptyPrefix + ( + prefix: String, + pageCount: Int, + leadingEmptyPageCount: Int, + requestContinuationToken: Option[String] + ): Flux[FeedResponse[SparkRowItem]] = { + + // generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel. + val emptyPageSentinel = -1 + val firstDataPageStartIndex = 1 + + val responses = Array.range(1, pageCount + 1) + .map(i => generateFeedResponse( + prefix, + i, + if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex)) + .filter(response => requestContinuationToken.isEmpty || + requestContinuationToken.get < response.getContinuationToken) + + Flux.fromArray(responses) + } + private val objectMapper = new ObjectMapper @throws[JsonProcessingException] diff --git a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md index 9b87a3fcf675..32117a8d7d32 100644 --- a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md index 570aec149b2c..d1afe6f098ff 100644 --- a/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md @@ -8,6 +8,7 @@ #### Bugs Fixed * Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086) +* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java index 7c8418ed8992..29355ae002c3 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java @@ -332,8 +332,14 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro } } - @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT) + @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", + timeOut = TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception { + // Note on shape: this test verifies Reactor's prefetch behavior on the change-feed + // byPage stream. The two fire-and-forget `.subscribe()` calls + `Thread.sleep(3000)` + // are intentional — they exercise the prefetch path without backpressure-bounded + // collection. retryAnalyzer = FlakyTestRetryAnalyzer absorbs occasional slow-runner + // jitter (Windows EmulatorTcp Java 8 can take >3s to deliver the first 3 pages). this.createContainer( (cp) -> { if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) { @@ -1097,6 +1103,78 @@ public void changeFeedQueryCompleteAfterAvailableNow( } } + @Test(groups = { "emulator" }, timeOut = TIMEOUT * 5) + public void changeFeedQuery_notModifiedPagesAllowed_surfacesNoChangesPagesAndTerminates() { + // End-to-end guard: when the SDK is opted into notModifiedPagesAllowed=true + // (via the friend-API bridge accessor — the same opt-in the Cosmos Spark connector uses), + // change-feed reads against a multi-partition container must: + // (a) surface 304/noChanges pages individually to the caller, AND + // (b) terminate via the FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304 + // defense rather than poll indefinitely. + // + // This is the integration-level pin for the contract that ChangeFeedFetcher.nextPageInternal + // branch 2 explicitly calls disableShouldFetchMore() on NO_RETRY noChanges. Without that + // arm, a caller that drained the flux to completion would hang. + String testContainerId = UUID.randomUUID().toString(); + try { + CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk"); + CosmosAsyncContainer testContainer = + createCollection( + this.createdAsyncDatabase, + containerProperties, + new CosmosContainerRequestOptions(), + // throughput high enough to provision multiple physical partitions + 11000); + + // Sparse workload: a few docs spread across partitions; most physical partitions + // will return 304 / noChanges on read, exercising the empty-page surfacing path. + insertDocuments(/* partitionCount */ 3, /* documentCount */ 2, testContainer); + + CosmosChangeFeedRequestOptions options = + CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange()); + ImplementationBridgeHelpers + .CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .setAllowNotModifiedPages(options, true); + + AtomicInteger totalPagesObserved = new AtomicInteger(0); + AtomicInteger totalDocsObserved = new AtomicInteger(0); + AtomicInteger notModifiedPagesObserved = new AtomicInteger(0); + + // Drain a bounded slice of the change feed - the iteration must terminate within + // a reasonable page count via the SDK's consecutive-304 defense. + testContainer.queryChangeFeed(options, JsonNode.class) + .byPage(1) + .take(100) + .doOnNext(response -> { + totalPagesObserved.incrementAndGet(); + int pageSize = response.getResults().size(); + totalDocsObserved.addAndGet(pageSize); + if (pageSize == 0) { + notModifiedPagesObserved.incrementAndGet(); + } + }) + .blockLast(); + + // (a) at least some empty pages must have surfaced - the whole point of the opt-in + assertThat(notModifiedPagesObserved.get()) + .describedAs("notModifiedPagesAllowed=true must surface 304/noChanges pages individually") + .isGreaterThan(0); + // (b) all inserted docs must be observed - empty-page surfacing must not interfere + // with data-page emission + assertThat(totalDocsObserved.get()) + .describedAs("all inserted documents must surface") + .isEqualTo(6); + // (c) iteration must have terminated (we didn't hit the take(100) cap, otherwise + // we'd be polling indefinitely - that's the regression the defense-in-depth arm prevents) + assertThat(totalPagesObserved.get()) + .describedAs("iteration must terminate via consecutive-304 defense, not hit the take(100) cap") + .isLessThan(100); + } finally { + safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId)); + } + } + void insertDocuments( int partitionCount, int documentCount) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java new file mode 100644 index 000000000000..818c16c6277b --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java @@ -0,0 +1,184 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation; + +import com.azure.cosmos.CosmosItemSerializer; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.ModelBridgeInternal; +import org.testng.annotations.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for the paged-flux pull continuation path on + * {@link CosmosChangeFeedRequestOptions#withCosmosPagedFluxOptions(CosmosPagedFluxOptions)} (package-visible via + * {@link ModelBridgeInternal#getEffectiveChangeFeedRequestOptions(CosmosChangeFeedRequestOptions, CosmosPagedFluxOptions)}). + * + *

That method silently builds a brand-new {@code CosmosChangeFeedRequestOptionsImpl} when the caller supplies a + * continuation token via {@link CosmosPagedFluxOptions}, so any field NOT explicitly copied is dropped. These tests + * lock in the propagation of fields whose loss would silently break a feature. + */ +public class CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest { + + @Test(groups = { "unit" }) + public void notModifiedPagesAllowed_isPropagated_whenContinuationTokenSupplied() { + // GIVEN a CosmosChangeFeedRequestOptions with notModifiedPagesAllowed=true (the value the Spark connector sets) + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .setAllowNotModifiedPages(src, true); + + // AND a continuation token supplied via the paged-flux pull mechanism + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setRequestContinuation(buildContinuationToken()); + + // WHEN computing the effective options + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + // THEN notModifiedPagesAllowed must be preserved on the freshly-built impl + assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getAllowNotModifiedPages(effective)) + .describedAs("notModifiedPagesAllowed must survive the paged-flux pull continuation rebuild") + .isTrue(); + } + + @Test(groups = { "unit" }) + public void notModifiedPagesAllowedFalse_isPropagated_whenContinuationTokenSupplied() { + // The default value should also round-trip cleanly (sanity check that we're not just hard-coding true). + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setRequestContinuation(buildContinuationToken()); + + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getAllowNotModifiedPages(effective)) + .describedAs("notModifiedPagesAllowed default (false) must survive the paged-flux pull continuation rebuild") + .isFalse(); + } + + @Test(groups = { "unit" }) + public void notModifiedPagesAllowed_isPreserved_whenNoContinuationTokenSupplied() { + // No continuation → withCosmosPagedFluxOptions returns `this` unchanged. + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .setAllowNotModifiedPages(src, true); + + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setMaxItemCount(50); + + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getAllowNotModifiedPages(effective)) + .isTrue(); + } + + @Test(groups = { "unit" }) + public void endLSN_isPropagated_whenContinuationTokenSupplied() { + // Locks in the bounded-change-feed contract across a byPage(savedContinuation) round-trip: + // a caller who set endLSN=42 must continue to see iteration bounded by LSN 42 after resume. + // Before the inheritNonContinuationFieldsFrom fix, endLSN was silently dropped on the rebuild + // path, turning a bounded change feed into an unbounded one. + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .setEndLSN(src, 42L); + + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setRequestContinuation(buildContinuationToken()); + + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getEndLSN(effective)) + .describedAs("endLSN must survive the paged-flux pull continuation rebuild") + .isEqualTo(42L); + } + + @Test(groups = { "unit" }) + public void customSerializer_isPropagated_whenContinuationTokenSupplied() { + // Locks in custom-serializer preservation across a byPage(savedContinuation) round-trip: + // a caller who registered a custom CosmosItemSerializer must continue to see items + // deserialized through that serializer after resume. Before the inheritNonContinuationFieldsFrom + // fix, the customSerializer was silently dropped on the rebuild path, falling back to the + // SDK's internal default serializer and potentially producing wrong field values. + CosmosItemSerializer sentinel = new CosmosItemSerializer() { + @Override + public java.util.Map serialize(T item) { return null; } + + @Override + public T deserialize(java.util.Map jsonNodeMap, Class classType) { return null; } + }; + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + src.setCustomItemSerializer(sentinel); + + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setRequestContinuation(buildContinuationToken()); + + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + assertThat(effective.getCustomItemSerializer()) + .describedAs("customSerializer must survive the paged-flux pull continuation rebuild") + .isSameAs(sentinel); + } + + @Test(groups = { "unit" }) + public void tokenEncodedFields_overrideCallerSuppliedValues_whenContinuationTokenSupplied() { + // Negative pin: the four token-encoded fields (continuationState, feedRangeInternal, mode, + // startFromInternal) MUST come from the token, not from the caller's pre-resume options. + // The caller's options here have continuationState=null (createForProcessingFromBeginning), + // but the resulting effective options must have a non-null continuationState parsed from + // the supplied token. If a future refactor accidentally inherits the token-encoded fields + // from the source impl (e.g. moving them into inheritNonContinuationFieldsFrom), this test + // catches the regression because the source's continuationState would clobber the token's. + CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions + .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange()); + + CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions(); + pagedFluxOptions.setRequestContinuation(buildContinuationToken()); + + CosmosChangeFeedRequestOptions effective = ModelBridgeInternal + .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions); + + assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper + .getCosmosChangeFeedRequestOptionsAccessor() + .getImpl(effective) + .getContinuation()) + .describedAs("continuationState is encoded in the token and MUST override the caller's pre-resume value") + .isNotNull(); + } + + private static String buildContinuationToken() { + // Build a real ChangeFeedState so we can serialize a valid (base64-encoded) continuation token. + // We use the state's own toString() which round-trips through createForProcessingFromContinuation. + ChangeFeedStateV1 state = new ChangeFeedStateV1( + "someContainerRid", + FeedRangeEpkImpl.forFullRange(), + ChangeFeedMode.INCREMENTAL, + ChangeFeedStartFromInternal.createFromBeginning(), + null); + return state.toString(); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java new file mode 100644 index 000000000000..b917f55864a7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java @@ -0,0 +1,456 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.query; + +import com.azure.cosmos.implementation.Document; +import com.azure.cosmos.implementation.DocumentClientRetryPolicy; +import com.azure.cosmos.implementation.DocumentServiceRequestContext; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.IRetryPolicyFactory; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.ShouldRetryResult; +import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState; +import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext; +import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation; +import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl; +import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; +import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link ChangeFeedFetcher} covering the {@code notModifiedPagesAllowed} behavior change. + * + *

The two behaviors locked in here are: + *

    + *
  1. {@code isFullyDrained} consults only the continuation (no {@code noChanges} short-circuit), + * which is what allows {@code notModifiedPagesAllowed=true} to surface empty pages without first + * having to call {@code reEnableShouldFetchMoreForRetry()} to undo a base-class decision.
  2. + *
  3. When {@code notModifiedPagesAllowed=true}, {@code nextPageInternal} returns + * {@code Mono.just(noChangesResponse)} instead of {@code Mono.empty()} so the empty pages + * bubble up to the caller (Spark connector) where the per-page end-to-end timeout applies to + * each individual page rather than being exceeded by serial empty-page drains.
  4. + *
+ */ +public class ChangeFeedFetcherNotModifiedPagesTest { + + @Test(groups = { "unit" }) + public void isFullyDrained_noChangesResponseWithNotModifiedPagesAllowedTrue_returnsTrue() { + // GIVEN a ChangeFeedFetcher with notModifiedPagesAllowed=true and a noChanges response + // whose continuation reports !isDone() + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ true); + + FeedResponse noChangesResponse = changeFeedNoChanges("token-A"); + + // WHEN + boolean drained = invokeIsFullyDrained(fetcher, noChangesResponse); + + // THEN: isFullyDrained short-circuits on noChanges regardless of the + // notModifiedPagesAllowed flag. With notModifiedPagesAllowed=true the bottom of + // nextPageInternal undoes the side effect via reEnableShouldFetchMoreForRetry() + // so the iteration continues; with notModifiedPagesAllowed=false the + // shouldFetchMore=false sticks and the iteration terminates. + assertThat(drained).isTrue(); + } + + @Test(groups = { "unit" }) + public void isFullyDrained_noChangesResponseWithFinishedContinuation_returnsTrue() { + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(true); + ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ true); + + FeedResponse noChangesResponse = changeFeedNoChanges("token-B"); + + assertThat(invokeIsFullyDrained(fetcher, noChangesResponse)).isTrue(); + } + + @Test(groups = { "unit" }) + public void isFullyDrained_realResponseWithFinishedContinuation_returnsTrue() { + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(true); + ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ false); + + FeedResponse dataResponse = changeFeedDataPage("token-C", new Document()); + + assertThat(invokeIsFullyDrained(fetcher, dataResponse)).isTrue(); + } + + @Test(groups = { "unit" }) + public void isFullyDrained_noChangesResponseWithNotModifiedPagesAllowedFalse_returnsTrue() { + // Regression test: with the default notModifiedPagesAllowed=false the noChanges short-circuit + // MUST stay in place. Otherwise any non-Spark caller that drains the change feed flux + // (e.g. queryChangeFeed(...).byPage().toIterable().iterator()) would loop forever, + // because FeedRangeCompositeContinuationImpl.isDone() returns + // compositeContinuationTokens.size()==0, which is permanently false for incremental + // change feed (moveToNextToken rotates the deque, never shrinks it). The + // NO_RETRY result from handleChangeFeedNotModified is the only termination signal, + // and Paginator only honors it via Fetcher.shouldFetchMore() being flipped by + // isFullyDrained=true on this noChanges page. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ false); + + FeedResponse noChangesResponse = changeFeedNoChanges("token-D"); + + assertThat(invokeIsFullyDrained(fetcher, noChangesResponse)) + .describedAs("With notModifiedPagesAllowed=false the noChanges short-circuit must remain to terminate iteration") + .isTrue(); + // The short-circuit must fire WITHOUT consulting the continuation; if a future + // refactor accidentally drops the noChanges check and falls through to + // continuation.isDone() (which is permanently false in incremental change feed), + // this verify catches it as a loud failure rather than a hard-to-diagnose hang. + Mockito.verify(continuation, Mockito.never()).isDone(); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_notModifiedPagesAllowedTrue_surfacesNoChangesPagesIndividually() { + // Scenario: 3 consecutive noChanges (304) pages from the same sub-feedRange, + // followed by a real data page. With notModifiedPagesAllowed=true, each of the 4 + // physical responses must surface as its own Mono emission so the Spark + // iterator can drain them under the per-page end-to-end timeout window. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + // handleChangeFeedNotModified returns RETRY_NOW only for actual noChanges responses + // (matching the real FeedRangeCompositeContinuationImpl behavior). + when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> { + FeedResponse rsp = invocation.getArgument(0); + return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry(); + }); + + FeedResponse noChanges1 = changeFeedNoChanges("t1"); + FeedResponse noChanges2 = changeFeedNoChanges("t2"); + FeedResponse noChanges3 = changeFeedNoChanges("t3"); + FeedResponse data = changeFeedDataPage("t4", new Document()); + + FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, noChanges3, data }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc); + + // Drive the fetcher across 4 nextPage() invocations and assert each one surfaces. + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges1).verifyComplete(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges2).verifyComplete(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges3).verifyComplete(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete(); + + assertThat(callIndex.get()).describedAs("executeFunc should have been called once per surfaced page").isEqualTo(4); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_notModifiedPagesAllowedTrueWithNoRetryOnNoChanges_terminatesIteration() { + // Defense-in-depth: with notModifiedPagesAllowed=true, isFullyDrained() consults only + // continuation.isDone() (permanently false in incremental change feed), so the + // SDK's own termination signal would otherwise be lost. nextPageInternal must + // explicitly disableShouldFetchMore() when handleChangeFeedNotModified returns + // NO_RETRY on a noChanges page (single-partition case, multi-partition full + // cycle complete, or the >4*(size+1) consecutive-304 defense). + // + // This test scripts: 3 noChanges with RETRY_NOW (mid-cycle), followed by a 4th + // noChanges with NO_RETRY (terminal). All 4 must surface, and shouldFetchMore() + // must be false after the terminal page so Paginator's outer loop stops. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + + FeedResponse mid1 = changeFeedNoChanges("t1"); + FeedResponse mid2 = changeFeedNoChanges("t2"); + FeedResponse mid3 = changeFeedNoChanges("t3"); + FeedResponse terminal = changeFeedNoChanges("t4"); + + // The continuation distinguishes terminal from mid-cycle by reference identity. + when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> { + FeedResponse rsp = invocation.getArgument(0); + return rsp == terminal ? ShouldRetryResult.noRetry() : ShouldRetryResult.RETRY_NOW; + }); + + FeedResponse[] script = new FeedResponse[] { mid1, mid2, mid3, terminal }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc); + + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid1).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after mid1").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid2).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after mid2").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid3).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after mid3").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == terminal).verifyComplete(); + assertThat(fetcher.shouldFetchMore()) + .describedAs("NO_RETRY on terminal noChanges page MUST stop Paginator from polling again") + .isFalse(); + assertThat(callIndex.get()) + .describedAs("after NO_RETRY termination, executeFunc must NOT be called again") + .isEqualTo(4); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_notModifiedPagesAllowedTrueWithDataPages_doesNotTerminate() { + // Defense-in-depth contract guard: handleChangeFeedNotModified in the real + // FeedRangeCompositeContinuationImpl returns NO_RETRY for EVERY non-noChanges + // response (the early `if (!noChanges(r))` clause resets state and then falls + // through to the final `return NO_RETRY`). The branch-2 termination logic in + // ChangeFeedFetcher.nextPageInternal therefore MUST gate the + // disableShouldFetchMore() call on `noChanges(r) && notModifiedPagesAllowed`; if a + // future refactor drops the noChanges(r) guard, every data page would silently + // truncate iteration after the first emission. This test pins that contract. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + // Match real production behavior: NO_RETRY on data pages, RETRY_NOW on noChanges. + when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> { + FeedResponse rsp = invocation.getArgument(0); + return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry(); + }); + + FeedResponse data1 = changeFeedDataPage("d1", new Document()); + FeedResponse data2 = changeFeedDataPage("d2", new Document()); + FeedResponse data3 = changeFeedDataPage("d3", new Document()); + + FeedResponse[] script = new FeedResponse[] { data1, data2, data3 }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc); + + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data1).verifyComplete(); + assertThat(fetcher.shouldFetchMore()) + .describedAs("data pages must NOT terminate iteration even when handleChangeFeedNotModified returns NO_RETRY") + .isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data2).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after data2").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data3).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after data3").isTrue(); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_notModifiedPagesAllowedFalse_swallowsNoChangesPagesUntilData() { + // Same scenario, but with the default notModifiedPagesAllowed=false. The + // 3 noChanges responses should be swallowed via repeatWhenEmpty and + // only the data page should surface from a SINGLE nextPage() call. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> { + FeedResponse rsp = invocation.getArgument(0); + return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry(); + }); + + FeedResponse noChanges1 = changeFeedNoChanges("t1"); + FeedResponse noChanges2 = changeFeedNoChanges("t2"); + FeedResponse noChanges3 = changeFeedNoChanges("t3"); + FeedResponse data = changeFeedDataPage("t4", new Document()); + + FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, noChanges3, data }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ false, executeFunc); + + // A single nextPage() should internally drain all 3 noChanges responses and + // emit the data response. + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete(); + + assertThat(callIndex.get()).describedAs("executeFunc should be called once per physical fetch").isEqualTo(4); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_endLsnSet_notModifiedPagesAllowedTrue_surfacesNoChangesUntilHasFetchedAllChanges() { + // Spark batch reads set BOTH endLSN AND notModifiedPagesAllowed=true, routing through + // ChangeFeedFetcher.nextPageInternal branch 1 (the + // `completeAfterAllCurrentChangesRetrieved || endLSN != null` path). That branch calls + // surfaceOrSwallowNoChangesPage(r) on noChanges pages, then terminates via + // hasFetchedAllChanges -> disableShouldFetchMore(). This test pins both halves of the + // contract: empty pages surface to the caller (per-page timeout enforcement) AND + // iteration eventually terminates when hasFetchedAllChanges returns true. + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + + FeedResponse noChanges1 = changeFeedNoChanges("t1"); + FeedResponse noChanges2 = changeFeedNoChanges("t2"); + FeedResponse terminal = changeFeedNoChanges("t3"); + + // hasFetchedAllChanges returns true only for the terminal page (reference identity). + when(continuation.hasFetchedAllChanges(any(), any())).thenAnswer(invocation -> { + FeedResponse rsp = invocation.getArgument(0); + return rsp == terminal; + }); + + FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, terminal }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, /* endLSN */ 999L, executeFunc); + + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges1).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after noChanges1, mid-cycle").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges2).verifyComplete(); + assertThat(fetcher.shouldFetchMore()).describedAs("after noChanges2, mid-cycle").isTrue(); + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == terminal).verifyComplete(); + assertThat(fetcher.shouldFetchMore()) + .describedAs("hasFetchedAllChanges=true on terminal page MUST stop Paginator from polling again") + .isFalse(); + assertThat(callIndex.get()) + .describedAs("after termination, executeFunc must NOT be called again") + .isEqualTo(3); + } + + @Test(groups = { "unit" }, timeOut = 10_000) + public void nextPage_endLsnSet_notModifiedPagesAllowedFalse_swallowsNoChangesViaRepeatWhenEmpty() { + // Legacy regression guard for branch 1 with the default notModifiedPagesAllowed=false. + // The 2 noChanges responses should be swallowed via repeatWhenEmpty; only the data + // page should surface from a SINGLE nextPage() call (matching legacy behavior). + FeedRangeContinuation continuation = mock(FeedRangeContinuation.class); + when(continuation.isDone()).thenReturn(false); + + FeedResponse noChanges1 = changeFeedNoChanges("t1"); + FeedResponse noChanges2 = changeFeedNoChanges("t2"); + FeedResponse data = changeFeedDataPage("t3", new Document()); + + // hasFetchedAllChanges returns false for all 3 (iteration shouldn't terminate via this signal). + when(continuation.hasFetchedAllChanges(any(), any())).thenReturn(false); + + FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, data }; + AtomicInteger callIndex = new AtomicInteger(); + Function>> executeFunc = + req -> Mono.just(script[callIndex.getAndIncrement()]); + + ChangeFeedFetcher fetcher = + newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ false, /* endLSN */ 999L, executeFunc); + + // A single nextPage() should drain all 2 noChanges responses via repeatWhenEmpty + // and surface the data page. + StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete(); + assertThat(callIndex.get()).describedAs("executeFunc should be called once per physical fetch").isEqualTo(3); + } + + // Note: the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl is + // pinned end-to-end by CosmosContainerChangeFeedTest.changeFeedQuery_notModifiedPagesAllowed_* + // (emulator group). That impl class is package-private, so a unit-level integration test + // here would require reflection — the emulator test exercises the real production path. + + // ---- helpers ---- + + private static FeedResponse changeFeedNoChanges(String continuationToken) { + return FeedResponseBuilder.changeFeedResponseBuilder(Document.class) + .withContinuationToken(continuationToken) + .lastChangeFeedPage() + .build(); + } + + private static FeedResponse changeFeedDataPage(String continuationToken, Document... docs) { + return FeedResponseBuilder.changeFeedResponseBuilder(Document.class) + .withContinuationToken(continuationToken) + .withResults(docs) + .build(); + } + + private static ChangeFeedFetcher newFetcher(FeedRangeContinuation continuation, boolean notModifiedPagesAllowed) { + return newFetcherWithExecuteFunc(continuation, notModifiedPagesAllowed, /* endLSN */ null, req -> Mono.empty()); + } + + private static ChangeFeedFetcher newFetcherWithExecuteFunc( + FeedRangeContinuation continuation, + boolean notModifiedPagesAllowed, + Function>> executeFunc) { + + return newFetcherWithExecuteFunc(continuation, notModifiedPagesAllowed, /* endLSN */ null, executeFunc); + } + + private static ChangeFeedFetcher newFetcherWithExecuteFunc( + FeedRangeContinuation continuation, + boolean notModifiedPagesAllowed, + Long endLSN, + Function>> executeFunc) { + + RxDocumentClientImpl client = mock(RxDocumentClientImpl.class); + IRetryPolicyFactory resetSessionTokenRetryPolicy = mock(IRetryPolicyFactory.class); + DocumentClientRetryPolicy noOpRetryPolicy = mock(DocumentClientRetryPolicy.class); + when(noOpRetryPolicy.shouldRetry(any())).thenReturn(Mono.just(ShouldRetryResult.noRetry())); + when(resetSessionTokenRetryPolicy.getRequestPolicy(any())).thenReturn(noOpRetryPolicy); + when(client.getResetSessionTokenRetryPolicy()).thenReturn(resetSessionTokenRetryPolicy); + ChangeFeedState changeFeedState = mock(ChangeFeedState.class); + FeedRangeInternal feedRange = FeedRangeEpkImpl.forFullRange(); + when(changeFeedState.getContinuation()).thenReturn(continuation); + when(changeFeedState.getFeedRange()).thenReturn(feedRange); + doNothing().when(changeFeedState).populateRequest(any(RxDocumentServiceRequest.class), anyInt()); + + Supplier createRequestFunc = ChangeFeedFetcherNotModifiedPagesTest::stubRequest; + + Map requestOptionProperties = new HashMap<>(); + GlobalEndpointManager gem = mock(GlobalEndpointManager.class); + GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker gpe = + mock(GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.class); + when(gpe.isPerPartitionLevelCircuitBreakingApplicable(any())).thenReturn(false); + + return new ChangeFeedFetcher<>( + client, + createRequestFunc, + executeFunc, + changeFeedState, + requestOptionProperties, + /* top */ -1, + /* maxItemCount */ 100, + /* isSplitHandlingDisabled */ true, + /* completeAfterAllCurrentChangesRetrieved */ false, + notModifiedPagesAllowed, + endLSN, + /* operationContext */ null, + gem, + gpe, + /* diagnosticsClientContext */ null); + } + + private static RxDocumentServiceRequest stubRequest() { + RxDocumentServiceRequest request = mock(RxDocumentServiceRequest.class); + when(request.getIsNameBased()).thenReturn(true); + when(request.getResourceAddress()).thenReturn("dbs/db1/colls/coll1"); + when(request.getOperationType()).thenReturn(OperationType.ReadFeed); + when(request.getResourceType()).thenReturn(ResourceType.Document); + // requestContext and faultInjectionRequestContext are public fields on the real class; + // direct assignment on the mock works (Mockito only intercepts method calls). + request.requestContext = new DocumentServiceRequestContext(); + request.faultInjectionRequestContext = new FaultInjectionRequestContext(); + return request; + } + + // isFullyDrained is `protected` on Fetcher; this test class lives in the same package + // (com.azure.cosmos.implementation.query), so we can call it directly without reflection. + private static boolean invokeIsFullyDrained( + ChangeFeedFetcher fetcher, + FeedResponse response) { + + return fetcher.isFullyDrained(/* isChangeFeed */ true, response); + } +} diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 4d6c3781b00b..e43597e1d10c 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,7 @@ #### Bugs Fixed #### Other Changes +* Added an internal `emptyPagesAllowed` field on `CosmosChangeFeedRequestOptionsImpl` (default `false`; not exposed publicly). When set, `ChangeFeedFetcher` surfaces 304/empty pages to the caller. Consumed by the Cosmos Spark connector to fix an `OperationCancelledException` on sparse cross-partition change-feed workloads. Default behavior is unchanged. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276) * Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062) * Fixed a sporadic `NullPointerException` in `JsonSerializable.getWithMapping` triggered by concurrent first-time calls to `DatabaseAccount.getConsistencyPolicy()` and its sibling lazy getters (`getReplicationPolicy`, `getSystemReplicationPolicy`, `getQueryEngineConfiguration`). The fix makes `JsonSerializable.propertyBag` `final`, closing an unsafe-publication race in the lazy-initialisation pattern. - See [Issue 49256](https://github.com/Azure/azure-sdk-for-java/issues/49256) and [PR #49258](https://github.com/Azure/azure-sdk-for-java/pull/49258) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java index c8bca12fcbf4..f280e2562159 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java @@ -121,6 +121,7 @@ public Flux> executeAsync() { this.options.getMaxPrefetchPageCount(), ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options), this.options.isCompleteAfterAllCurrentChangesRetrieved(), + changeFeedOptionsAccessor().getAllowNotModifiedPages(this.options), changeFeedOptionsAccessor() .getEndLSN(this.options), changeFeedOptionsAccessor() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java index 8c4f6fdd646b..429d13f24da0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java @@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ private boolean completeAfterAllCurrentChangesRetrieved; private Long endLSN; private ReadConsistencyStrategy readConsistencyStrategy; + private boolean notModifiedPagesAllowed; public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) { if (toBeCloned.continuationState != null) { @@ -80,6 +81,48 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB this.keywordIdentifiers = toBeCloned.keywordIdentifiers; this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved; this.endLSN = toBeCloned.endLSN; + this.notModifiedPagesAllowed = toBeCloned.notModifiedPagesAllowed; + } + + /** + * Inherits all non-token-encoded fields from {@code source} onto this instance, preserving the + * caller-supplied configuration when this instance was freshly built from a continuation token. + * + *

The four fields encoded in the continuation token itself ({@code continuationState}, + * {@code feedRangeInternal}, {@code mode}, {@code startFromInternal}) are intentionally NOT + * copied — they describe "where to resume from" and must come from the token, not the caller's + * pre-resume options. Every other field IS copied so the caller's configuration (endLSN, + * customSerializer, excludeRegions, read-consistency strategy, throughput-control group, + * diagnostic thresholds, etc.) survives the {@code byPage(savedContinuation)} round-trip. + * + *

Maintenance contract: when a new field is added to this class, decide whether the + * continuation token encodes it. If not (the common case for caller-supplied configuration), + * propagate it here. + */ + public void inheritNonContinuationFieldsFrom(CosmosChangeFeedRequestOptionsImpl source) { + // continuationState, feedRangeInternal, mode, startFromInternal: + // intentionally NOT copied (encoded in the continuation token itself). + // collectionRid IS preserved: it lives on the impl but is not embedded in the + // continuation token (the token's separate containerRid lives on ChangeFeedStateV1). + // The rest-of-SDK clone path (RxDocumentClientImpl.queryDocumentChangeFeedFromPagedFluxInternal + // -> accessor.clone -> copy ctor) preserves collectionRid; we match that here. + this.maxItemCount = source.maxItemCount; + this.maxPrefetchPageCount = source.maxPrefetchPageCount; + this.isSplitHandlingDisabled = source.isSplitHandlingDisabled; + this.quotaInfoEnabled = source.quotaInfoEnabled; + this.throughputControlGroupName = source.throughputControlGroupName; + this.customOptions = source.customOptions; + this.operationContextAndListenerTuple = source.operationContextAndListenerTuple; + this.thresholds = source.thresholds; + this.excludeRegions = source.excludeRegions; + this.customSerializer = source.customSerializer; + this.partitionKeyDefinition = source.partitionKeyDefinition; + this.collectionRid = source.collectionRid; + this.keywordIdentifiers = source.keywordIdentifiers; + this.completeAfterAllCurrentChangesRetrieved = source.completeAfterAllCurrentChangesRetrieved; + this.endLSN = source.endLSN; + this.readConsistencyStrategy = source.readConsistencyStrategy; + this.notModifiedPagesAllowed = source.notModifiedPagesAllowed; } public CosmosChangeFeedRequestOptionsImpl( @@ -185,6 +228,39 @@ public void setQuotaInfoEnabled(boolean quotaInfoEnabled) { this.quotaInfoEnabled = quotaInfoEnabled; } + /** + * Returns whether the change-feed pipeline surfaces 304/noChanges (empty) pages to the caller. + * + * @return {@code true} when each 304/noChanges page is surfaced individually (default is {@code false}). + */ + public boolean isNotModifiedPagesAllowed() { + return this.notModifiedPagesAllowed; + } + + /** + * Controls whether {@code ChangeFeedFetcher} surfaces 304/noChanges pages to the caller instead + * of swallowing them via Reactor's {@code repeatWhenEmpty}. Defaults to {@code false} (legacy + * behavior: empty pages are absorbed and only the next non-empty page is emitted). + * + *

When set to {@code true}, every physical 304 response surfaces as its own + * {@code FeedResponse}, so the SDK's per-page end-to-end timeout applies to each page rather + * than being exceeded by serial empty-page drains. Caller iterators MUST handle empty + * {@code FeedResponse} pages without entering retry loops. + * + *

Intentionally not surfaced on the public {@link com.azure.cosmos.models.CosmosChangeFeedRequestOptions} + * API. The flag changes paging semantics in subtle ways the SDK does not want most callers + * to opt into; reachable only from sibling modules (e.g. the Cosmos Spark connector) via the + * {@code ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper} bridge accessor. + * + * @param notModifiedPagesAllowed {@code true} to surface 304/noChanges pages individually; + * {@code false} (default) to swallow them via {@code repeatWhenEmpty}. + * @return this instance for fluent chaining. + */ + public CosmosChangeFeedRequestOptionsImpl setNotModifiedPagesAllowed(boolean notModifiedPagesAllowed) { + this.notModifiedPagesAllowed = notModifiedPagesAllowed; + return this; + } + public void setDiagnosticsThresholds( CosmosDiagnosticsThresholds operationSpecificThresholds) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index c48555496c29..ea0529f70d44 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -444,6 +444,29 @@ public interface CosmosChangeFeedRequestOptionsAccessor { void setPartitionKeyDefinition(CosmosChangeFeedRequestOptions changeFeedRequestOptions, PartitionKeyDefinition partitionKeyDefinition); Map getProperties(CosmosChangeFeedRequestOptions changeFeedRequestOptions); CosmosChangeFeedRequestOptions disableSplitHandling(CosmosChangeFeedRequestOptions changeFeedRequestOptions); + + /** + * Change-feed-side analogue of {@link CosmosQueryRequestOptionsAccessor#setAllowEmptyPages(CosmosQueryRequestOptions, boolean)}. + * Controls whether {@code ChangeFeedFetcher} surfaces 304/NotModified pages (originating from + * sub-partitions that report no changes) to the caller instead of swallowing them via + * {@code repeatWhenEmpty}. + * + *

Default is {@code false} (legacy swallow behavior). When {@code true}, every physical + * 304 response surfaces as its own {@code FeedResponse}; caller iterators must handle + * empty {@code FeedResponse} pages without entering retry loops. Intentionally NOT + * exposed on the public {@code CosmosChangeFeedRequestOptions} API — friend-only. + * + *

Naming differs from the query-side {@code setAllowEmptyPages} on purpose: on change + * feed, data-bearing empty pages already bubble up; this flag specifically opts into + * surfacing 304/NotModified sub-partition pages. + */ + void setAllowNotModifiedPages(CosmosChangeFeedRequestOptions options, boolean notModifiedPagesAllowed); + + /** + * Returns whether 304/NotModified pages are surfaced individually to the caller. See + * {@link #setAllowNotModifiedPages(CosmosChangeFeedRequestOptions, boolean)}. + */ + boolean getAllowNotModifiedPages(CosmosChangeFeedRequestOptions options); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java index d39f5dfc059c..01ea5f29680a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java @@ -45,6 +45,7 @@ private static ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccess private final Supplier createRequestFunc; private final Supplier feedRangeContinuationRetryPolicySupplier; private final boolean completeAfterAllCurrentChangesRetrieved; + private final boolean notModifiedPagesAllowed; private final Long endLSN; public ChangeFeedFetcher( @@ -57,6 +58,7 @@ public ChangeFeedFetcher( int maxItemCount, boolean isSplitHandlingDisabled, boolean completeAfterAllCurrentChangesRetrieved, + boolean notModifiedPagesAllowed, Long endLSN, OperationContextAndListenerTuple operationContext, GlobalEndpointManager globalEndpointManager, @@ -85,6 +87,7 @@ public ChangeFeedFetcher( diagnosticsClientContext); this.createRequestFunc = createRequestFunc; this.completeAfterAllCurrentChangesRetrieved = completeAfterAllCurrentChangesRetrieved; + this.notModifiedPagesAllowed = notModifiedPagesAllowed; this.endLSN = endLSN; } @@ -118,44 +121,66 @@ public Mono> nextPage() { private Mono> nextPageInternal(DocumentClientRetryPolicy retryPolicy) { return Mono.fromSupplier(() -> nextPageCore(retryPolicy)) .flatMap(Function.identity()) - .flatMap((r) -> { - FeedRangeContinuation continuationSnapshot = - this.changeFeedState.getContinuation(); - - if (this.completeAfterAllCurrentChangesRetrieved || this.endLSN != null) { - if (continuationSnapshot != null) { - - //track the end-LSN for each sub-feedRange and then find the next sub-feedRange to fetch more changes - boolean shouldComplete = continuationSnapshot.hasFetchedAllChanges(r, endLSN); - if (shouldComplete) { - this.disableShouldFetchMore(); - return Mono.just(r); - } - - if (ModelBridgeInternal.noChanges(r)) { - // if we have reached here, it means we have got 304 for the current feedRange, - // but we need to continue drain the changes from other sub-feedRange - this.reEnableShouldFetchMoreForRetry(); - return Mono.empty(); - } - } - } else { - // complete query based on 304s - if (continuationSnapshot != null && - continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) { - - // not all continuations have been drained yet - // repeat with the next continuation - this.reEnableShouldFetchMoreForRetry(); - return Mono.empty(); - } - } - - return Mono.just(r); - }) + .flatMap(this::applyNoChangesDecision) .repeatWhenEmpty(o -> o); } + /** + * Decides what to do with a single FeedResponse before it reaches the outer Paginator loop: + * surface it, swallow it via {@code repeatWhenEmpty}, or terminate iteration entirely. The + * decision depends on the change-feed mode (bounded vs streaming), whether the response is + * a noChanges 304, the continuation's {@code handleChangeFeedNotModified} signal, and whether + * the caller opted into {@code notModifiedPagesAllowed=true}. + */ + private Mono> applyNoChangesDecision(FeedResponse r) { + FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation(); + + if (this.completeAfterAllCurrentChangesRetrieved || this.endLSN != null) { + if (continuationSnapshot != null) { + //track the end-LSN for each sub-feedRange and then find the next sub-feedRange to fetch more changes + boolean shouldComplete = continuationSnapshot.hasFetchedAllChanges(r, endLSN); + if (shouldComplete) { + this.disableShouldFetchMore(); + return Mono.just(r); + } + + if (ModelBridgeInternal.noChanges(r)) { + // 304 for the current sub-feedRange; need to drain the next one. + return surfaceOrRetryNoChangesPage(r); + } + } + } else { + // Streaming change feed (no endLSN). Terminate either when no continuation + // exists or when handleChangeFeedNotModified signals NO_RETRY (single-partition + // case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304 + // defense in FeedRangeCompositeContinuationImpl). + if (continuationSnapshot != null) { + ShouldRetryResult retryResult = continuationSnapshot.handleChangeFeedNotModified(r); + if (retryResult == ShouldRetryResult.RETRY_NOW) { + // not all continuations have been drained yet; repeat with the next continuation + return surfaceOrRetryNoChangesPage(r); + } + } + } + + return Mono.just(r); + } + + /** + * Either surface a noChanges page to the caller (when notModifiedPagesAllowed=true) or swallow it via + * Reactor's repeatWhenEmpty (the legacy behavior). When swallowing, shouldFetchMore must be + * re-enabled first because isFullyDrained() already flipped it off for the noChanges page. + */ + private Mono> surfaceOrRetryNoChangesPage(FeedResponse r) { + this.reEnableShouldFetchMoreForRetry(); + + if (this.notModifiedPagesAllowed) { + ModelBridgeInternal.setFeedResponseContinuationToken(this.changeFeedState.toString(), r); + return Mono.just(r); + } + return Mono.empty(); + } + @Override protected String applyServerResponseContinuation( String serverContinuationToken, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java index cbd3ec9fba90..bf2fd3d332cd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java @@ -97,6 +97,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( int preFetchCount, boolean isSplitHandlingDisabled, boolean completeAfterAllCurrentChangesRetrieved, + boolean emptyPagesAllowed, Long endLsn, OperationContextAndListenerTuple operationContext, DiagnosticsClientContext diagnosticsClientContext) { @@ -112,6 +113,7 @@ public static Flux> getChangeFeedQueryResultAsObservable( maxPageSize, isSplitHandlingDisabled, completeAfterAllCurrentChangesRetrieved, + emptyPagesAllowed, endLsn, operationContext, client.getGlobalEndpointManager(), diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index dba5a536ddb8..e8a155cafae2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -442,11 +442,18 @@ CosmosChangeFeedRequestOptions withCosmosPagedFluxOptions( CosmosChangeFeedRequestOptions effectiveRequestOptions = this; if (pagedFluxOptions.getRequestContinuation() != null) { + // Rebuild from the saved continuation token (this produces the 4 token-encoded + // fields: continuationState, feedRangeInternal, mode, startFromInternal) and then + // inherit every other field from the caller's original options. Without this + // inheritance, a byPage(savedContinuation) round-trip would silently drop + // caller-supplied configuration like endLSN, customSerializer, excludeRegions, + // readConsistencyStrategy, etc. See + // CosmosChangeFeedRequestOptionsImpl.inheritNonContinuationFieldsFrom for the + // exhaustive field-by-field rationale. effectiveRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromContinuation( pagedFluxOptions.getRequestContinuation()); - effectiveRequestOptions.setMaxPrefetchPageCount(this.getMaxPrefetchPageCount()); - effectiveRequestOptions.setThroughputControlGroupName(this.getThroughputControlGroupName()); + effectiveRequestOptions.getImpl().inheritNonContinuationFieldsFrom(this.actualRequestOptions); } if (pagedFluxOptions.getMaxItemCount() != null) { @@ -782,6 +789,16 @@ public Map getProperties(CosmosChangeFeedRequestOptions changeFe public CosmosChangeFeedRequestOptions disableSplitHandling(CosmosChangeFeedRequestOptions changeFeedRequestOptions) { return changeFeedRequestOptions.disableSplitHandling(); } + + @Override + public void setAllowNotModifiedPages(CosmosChangeFeedRequestOptions options, boolean notModifiedPagesAllowed) { + options.getImpl().setNotModifiedPagesAllowed(notModifiedPagesAllowed); + } + + @Override + public boolean getAllowNotModifiedPages(CosmosChangeFeedRequestOptions options) { + return options.getImpl().isNotModifiedPagesAllowed(); + } }); }