[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276
[Cosmos Spark] Surface empty change feed pages to avoid end-to-end timeout#49276tvaron3 wants to merge 17 commits into
Conversation
…meout
Spark partition tasks reading change feed (or executing cross-partition
queries) against a sparse workload could hit OperationCancelledException
("End-to-end timeout hit when trying to retrieve the next page") at the
connector's 65-second per-operation end-to-end timeout. Root cause: with
the default emptyPagesAllowed=false, ParallelDocumentQueryExecutionContext
and ChangeFeedFetcher swallow empty / 304 pages internally — a single
producer-side nextPage() call can keep draining many sub-feedRanges before
emitting one non-empty page. For sparse workloads the cumulative time blows
the per-operation timeout.
Fix:
* Spark ItemsPartitionReader (query path) calls
setAllowEmptyPages(true) on the CosmosQueryRequestOptions so the SDK's
existing emptyPagesAllowed plumbing applies.
* New internal-only emptyPagesAllowed flag on
CosmosChangeFeedRequestOptionsImpl (default false; behavior unchanged
for all other callers) plumbed through Paginator.
getChangeFeedQueryResultAsObservable into ChangeFeedFetcher.
nextPageInternal. When the flag is true, both 304 branches return
Mono.just(r) so empty pages bubble up to the iterator. Surfaced via
new package-private bridge accessor
CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages.
* ChangeFeedFetcher.isFullyDrained no longer short-circuits to true on
noChanges responses (it now consults only continuation.isDone()),
which removes the load-bearing reEnableShouldFetchMoreForRetry()
pattern that was previously needed to undo a base-class decision.
* Spark ChangeFeedPartitionReader opts into the new flag via the bridge
accessor.
* CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptions now also
propagates emptyPagesAllowed when the paged-flux pull mechanism
supplies a continuation token (the freshly-built impl would otherwise
silently lose the flag — comment added flagging the broader drift
hazard).
Tests:
* New ChangeFeedFetcherEmptyPagesTest (5 unit tests): exercises the
isFullyDrained behavior change and asserts that nextPageInternal
surfaces noChanges responses individually when the flag is true and
swallows them via repeatWhenEmpty when the flag is false.
* New CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest (3 unit
tests): locks in the flag propagation through
withCosmosPagedFluxOptions.
* Extended TransientIOErrorsRetryingIteratorSpec with a regression test
that drains hundreds of leading empty pages followed by data without
hitting the end-to-end timeout.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR updates the Cosmos DB Java SDK and Cosmos Spark connector to optionally surface empty (304 / noChanges) change feed pages to callers, preventing Spark partition tasks from exceeding the connector’s 65s per-operation end-to-end timeout when draining sparse, cross-partition workloads.
Changes:
- Adds an internal
emptyPagesAllowedflag for change feed requests and plumbs it throughChangeFeedQueryImpl→Paginator→ChangeFeedFetcher, so empty pages can be emitted instead of swallowed. - Updates Cosmos Spark readers (query + change feed) to opt into “allow empty pages” so each physical page is bounded by the per-page timeout window.
- Adds new unit/regression tests covering the new behavior and the paged-flux continuation rebuild path; updates changelogs accordingly.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java | Preserves emptyPagesAllowed across paged-flux continuation rebuild; exposes internal accessor bridge methods. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java | Adds emptyPagesAllowed parameter and forwards it into ChangeFeedFetcher. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java | Implements “surface empty pages” behavior and simplifies isFullyDrained to rely on continuation completion. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java | Extends change feed request options accessor interface with allow-empty-pages getters/setters. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java | Adds internal emptyPagesAllowed state and cloning support. |
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java | Passes allow-empty-pages option into the paginator pipeline. |
| sdk/cosmos/azure-cosmos/CHANGELOG.md | Documents the fix and clarifies Spark connector behavior. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java | Adds regression coverage for surfacing vs swallowing empty pages and new isFullyDrained semantics. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java | Ensures allow-empty-pages survives withCosmosPagedFluxOptions continuation rebuild path. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala | Enables “allow empty pages” for Spark query reads. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala | Enables “allow empty pages” for Spark change feed reads. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala | Adds Spark regression test for long empty-page runs without timing out. |
| sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
| sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md | Notes the Spark connector fix for sparse workloads. |
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…wed=false
The previous cleanup of ChangeFeedFetcher.isFullyDrained removed the
noChanges short-circuit unconditionally, on the rationale that consulting
continuation.isDone() was simpler. That regressed every non-Spark caller of
the change-feed API:
FeedRangeCompositeContinuationImpl.isDone() returns
compositeContinuationTokens.size() == 0, but moveToNextToken() rotates the
deque via poll() + add() and never shrinks it. So isDone() is permanently
false for normal incremental change-feed iteration. For the default
emptyPagesAllowed=false path:
1. The 304 arrives.
2. updateState calls isFullyDrained -> false (because isDone() is false).
3. nextPageInternal's else-branch sees handleChangeFeedNotModified return
NO_RETRY (single-partition case, multi-partition cycle-complete, or
the >4*(size+1) consecutive-304 defense) and falls through to
Mono.just(r).
4. Paginator's generate-loop checks shouldFetchMore() -> true and calls
nextPage() again -> infinite poll loop.
Customer-visible impact would be: any consumer that drains
queryChangeFeed(...).byPage() to completion (e.g.
.toIterable().iterator(), .collectList(), .blockLast()) hangs forever once
the change feed catches up.
flag is true (Spark path), surface every noChanges to the caller and let
the consumer decide when to stop iterating. When the flag is false (every
other caller, including the SDK's public queryChangeFeed API), preserve
the original termination signal.
Also addressed reviewer feedback:
* Drop unused org.mockito.Mockito import (would break checkstyle's
UnusedImports rule).
* Replace reflective field assignment in stubRequest() with direct field
writes on the public fields RxDocumentServiceRequest.requestContext
and .faultInjectionRequestContext. Mockito only intercepts method
calls; field writes on a mock work directly.
* Add a regression test
isFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue
that locks in the termination signal for the default path.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Defense-in-depth (F1): when emptyPagesAllowed=true the streaming change- feed path takes branch 2 of nextPageInternal. If handleChangeFeedNotModified returns NO_RETRY on a noChanges page (single- partition case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl), the SDK's built-in termination signal was being silently dropped because isFullyDrained() consults only continuation.isDone() in that mode (which is permanently false for incremental change feed). Now we explicitly call disableShouldFetchMore() to preserve the defense-in-depth termination guarantee even for emptyPagesAllowed=true callers. * DRY (F4): extracted the two near-identical 'surface or swallow via repeatWhenEmpty' blocks in nextPageInternal into a private surfaceOrSwallowNoChangesPage(r) helper. The branches now read as a one-line intent instead of seven near-identical lines. * Comment density (F9): tightened the long isFullyDrained comment to a 2-line tl;dr followed by the detailed rationale. * Test (F2): new nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_ terminatesIteration locks in the defense-in-depth fix - asserts that after a terminal NO_RETRY noChanges page, shouldFetchMore() flips to false so Paginator stops calling nextPage(). * Test (F3): added Mockito.verify(continuation, never()).isDone() to the pass-2 regression test so a future refactor that accidentally drops the noChanges short-circuit and falls through to the (permanently-false) continuation.isDone() check fails loudly instead of silently hanging. Test results: ChangeFeedFetcherEmptyPagesTest 7/7, FetcherTest 5/5, CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3, TransientIOErrorsRetryingIteratorSpec 7/7 - all green. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
* Test (F1): assert callIndex==4 after NO_RETRY termination in the pass-3 defense-in-depth test so a future regression that terminates iteration but still over-fetches is caught. * Test (F2): new nextPage_emptyPagesAllowedTrueWithDataPages_doesNotTerminate pins the production contract that the noChanges(r) guard on the disableShouldFetchMore() arm is load-bearing. In production, FeedRangeCompositeContinuationImpl.handleChangeFeedNotModified returns NO_RETRY for EVERY non-noChanges response (the early branch resets state and falls through). Without the noChanges(r) guard, every data page would silently truncate iteration after the first emission. * Comment (F5): added an inline rationale next to the noChanges(r) guard explaining why it must remain - prevents a future engineer from 'simplifying' away the guard without realizing the production truncation hazard. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
* Drop the new CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages
wrapper methods. Callers (ChangeFeedQueryImpl, Spark ChangeFeedPartitionReader,
the propagation unit test) now use the already-exposed
accessor.getImpl(options).{is,set}EmptyPagesAllowed() instead, keeping the
bridge accessor interface at its pre-PR shape.
* Move the azure-cosmos CHANGELOG entry from 'Bugs Fixed' to 'Other Changes'
and reword: this PR adds an internal-only field on
CosmosChangeFeedRequestOptionsImpl that pure SDK consumers cannot reach
without going through getImpl(). The customer-facing fix lives in the Spark
connector CHANGELOGs (which keep their 'Bugs Fixed' entries).
Test results: ChangeFeedFetcherEmptyPagesTest 8/8, FetcherTest 5/5,
CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest 3/3,
TransientIOErrorsRetryingIteratorSpec 7/7 - all green.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
@sdkReviewAgent |
kushagraThapar
left a comment
There was a problem hiding this comment.
Automated multi-reviewer code review — Tier 3 (Deep)
Reviewers dispatched in parallel: PR Deep Reviewer (correctness) + Engineering Quality Reviewer (code quality / testing / perf / agentic) + cosmos-pr-reviewer (Cosmos design alignment) + Cross SDK Reviewer (peers: dotnet, python).
Risk score 8 (public_api +4, query+retry capped +6, source/test-ratio −2). Design-doc clone synced fresh against tvaron3/cosmosdb-design-docs@main. Existing inline comments from copilot-pull-request-reviewer were de-duped — all 3 were already addressed by the author in commit 08f690b.
Overall verdict
Approve with suggestions. Correctness story is solid — PR Deep Reviewer independently verified every claim in the PR body against the sandbox (10 claims, all ✅). Cross SDK Reviewer confirmed neither .NET nor Python has an equivalent flag (because neither ships CosmosEndToEndOperationLatencyPolicyConfig per-operation timeout) — Java is deliberately and correctly divergent. cosmos-pr-reviewer found no domain-correctness regressions, only design-doc gaps.
Findings cluster into four addressable themes: (1) stale PR description [major], (2) broader drift hazard in continuation-rebuild copy list [major], (3) test-timeout discipline to match .NET parity [major], (4) various minor API hygiene / observability / javadoc gaps.
Blockers
(none)
Major findings
M1. PR description references removed bridge accessor methods — stale and misleading
Files: PR body (sections 2 & 5)
Severity: major | Category: code_quality
PR body claims:
"Exposed via new package-private bridge accessor
CosmosChangeFeedRequestOptionsAccessor.{get,set}AllowEmptyPages"
But commit 34d3da4fa51 ("Shrink bridge accessor surface") removed those wrapper methods. Spark now uses accessor.getImpl(options).setEmptyPagesAllowed(true) — the existing getImpl() escape hatch.
Verified:
$ grep -n "setEmptyPagesAllowed\|isEmptyPagesAllowed\|setAllowEmptyPages" \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
300: void setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
302: boolean getAllowEmptyPages(CosmosQueryRequestOptions options);
# only the pre-existing QUERY-side accessor; no change-feed-side wrapper
Why it matters: Future archaeologists triaging regressions will grep for accessor methods that don't exist.
Suggested fix: Update PR description items 2 & 5 to:
"Exposed via the existing
accessor.getImpl(options).setEmptyPagesAllowed(...)escape hatch — no new bridge accessor wrappers are added (see commit34d3da4fa51for the rationale on keeping the accessor surface small for internal-only flags)."
M2. Drift hazard in withCosmosPagedFluxOptions extends well beyond emptyPagesAllowed
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java:444-458
Severity: major | Category: design_alignment (with code-quality angle)
The PR adds a great drift-hazard comment ("this hand-maintained copy list is a known drift hazard") and propagates one field (emptyPagesAllowed) — but the same code path silently drops ~13 other fields when a user supplies a continuation token via byPage(savedContinuation):
endLSN, customSerializer, keywordIdentifiers, excludeRegions, readConsistencyStrategy, quotaInfoEnabled, customOptions, operationContextAndListenerTuple, thresholds, properties, isSplitHandlingDisabled, completeAfterAllCurrentChangesRetrieved, partitionKeyDefinition, collectionRid
Two of these are real Spark-adjacent footguns:
endLSN— silently lost ⇒ a bounded change feed becomes unbounded afterbyPage(continuation)resumecustomSerializer— silently lost ⇒ custom type deserialization breaks across the continuation boundary
Scope caveat (this is 🟡 not 🔴): This is pre-existing; the PR's comment is genuinely useful and the author explicitly scoped broader cleanup as out-of-scope.
Suggested fix (pick one):
- Minimum: Open a tracking GitHub issue and reference it from the new in-code comment; list the 13 affected fields.
- Better (incremental win in this PR): Add
endLSNandcustomSerializerto the copy list, plus two parallel tests inCosmosChangeFeedRequestOptionsWithPagedFluxOptionsTestmirroring the existingemptyPagesAllowed_isPropagatedtest. - Best (follow-up PR): Invert the default — use the copy constructor
new CosmosChangeFeedRequestOptionsImpl(this.actualRequestOptions)and surgically override only fields encoded in the continuation token. Changes the default from "drop unless listed" to "preserve unless overridden by the continuation". - Also: consider a reflection-based test that iterates declared fields on the impl and asserts each survives the round-trip.
M3. Test-timeout discipline missing on regression-guard tests — .NET parity gap
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Severity: major | Category: parity (test-rigor)
The new nextPage_emptyPagesAllowedTrue_* tests use StepVerifier.verifyComplete() with no explicit timeout. .NET's equivalent regression-guard test pins this with a wall-clock timeout so a future refactor reintroducing unbounded repeatWhenEmpty drain fails fast and loudly:
Verified:
$ grep -n "Timeout" Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs
173: [Timeout(5000)] ... public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges(int partitions)
$ grep -n "timeOut\|Duration.of" sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
# zero hits
Why it matters: Without a timeout, a regression would hang until TestNG's global timeout (often unset on unit suites) and look like CI flake rather than a loud failure. The PR explicitly motivates these tests as defense-in-depth against the very class of regression a wall-clock guard would catch.
Suggested fix: Add timeOut = 10_000 to the @Test annotation on the four nextPage_* tests, e.g.:
@Test(groups = { "unit" }, timeOut = 10_000)OR replace .verifyComplete() with .expectComplete().verify(Duration.ofSeconds(5)). The isFullyDrained_* tests don't need this (synchronous pure method).
This is the single biggest test-rigor gap vs peer SDKs that Cross SDK Reviewer identified.
Minor findings
m1. Missing javadoc on new isEmptyPagesAllowed() / setEmptyPagesAllowed()
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:190-197
Category: code_quality
Sibling CosmosQueryRequestOptionsImpl.setEmptyPagesAllowed (line 312-320) documents "Defaults to false" and the param contract; the new methods have none. Three consumer sites (ChangeFeedFetcher plumbing, withCosmosPagedFluxOptions copy, Spark ChangeFeedPartitionReader) currently rely on tribal knowledge.
Suggested fix: 1-3 line javadoc: "When true, ChangeFeedFetcher surfaces 304/noChanges pages to the caller instead of swallowing them via repeatWhenEmpty; defaults to false. Caller iterators must handle empty FeedResponse pages."
m2. Bridge-accessor asymmetry: change-feed side reaches through getImpl() while query side has a wrapper
Files: ImplementationBridgeHelpers.java:300-302 + ImplementationBridgeHelpers.java:425-447 + ChangeFeedPartitionReader.scala:220-222 + ChangeFeedQueryImpl.java:124
Severity: minor | Category: code_quality
TL;DR: The query-side accessor exposes a first-class wrapper for this exact flag; the change-feed accessor doesn't, so the same logical opt-in is reached via two different code shapes. Grep-discoverability and refactor-fragility cost, not a correctness bug.
The two sides, side-by-side
Query side (pre-existing, untouched by this PR):
// ImplementationBridgeHelpers.java:300-302 — CosmosQueryRequestOptionsAccessor interface
void setAllowEmptyPages(CosmosQueryRequestOptions options, boolean emptyPagesAllowed);
boolean getAllowEmptyPages(CosmosQueryRequestOptions options);// ItemsPartitionReader.scala:54 — Spark call site, one-liner via accessor
.setAllowEmptyPages(queryOptions, true)// ParallelDocumentQueryExecutionContext.java:319 — internal consumer uses the same wrapper
&& !qryOptAccessor().getAllowEmptyPages(this.cosmosQueryRequestOptions)Change-feed side (added in this PR):
// ImplementationBridgeHelpers.java:425-447 — CosmosChangeFeedRequestOptionsAccessor
// (no setAllowEmptyPages / getAllowEmptyPages method on the interface)// ChangeFeedPartitionReader.scala:220-222 — Spark call site reaches through getImpl()
.getImpl(options)
.setEmptyPagesAllowed(true)// ChangeFeedQueryImpl.java:124 — internal consumer also reaches through getImpl()
changeFeedOptionsAccessor().getImpl(this.options).isEmptyPagesAllowed(),Why it's worth flagging
- Grep-discoverability is asymmetric. A future engineer running
grep -rn 'setAllowEmptyPages\|getAllowEmptyPages'to answer "where do we opt callers into per-page empty-page surfacing?" gets only the query-side hits — Spark's change-feed call site andChangeFeedQueryImplare invisible at the symbol level. The cross-feature semantic equivalence disappears. - Refactor blast radius is asymmetric. Rename
setEmptyPagesAllowed→ something else later: query side breaks at the wrapper (single update point); change-feed side breaks at everygetImpl().setEmptyPagesAllowed(...)call site (Spark + internal consumer). - The accessor interface is the friend-API contract. Once an option type has wrapper-style accessors for some flags and
getImpl()-style reachthrough for others, the "what's the right way to access X from a friend module" question stops having a clean answer for that type. - Author has a defensible rationale. Commit
34d3da4fa51("Shrink bridge accessor surface") intentionally removed earlier wrapper additions to keep the accessor surface small. Real value worth honoring — but the cost is asymmetry vs. the sibling type that already exposes the same-named wrapper.
Suggested fix (recommend Option A; both are small)
-
Option A (preferred) — mirror the query side. In
ImplementationBridgeHelpers.javalines 425-447 add:void setAllowEmptyPages(CosmosChangeFeedRequestOptions options, boolean emptyPagesAllowed); boolean getAllowEmptyPages(CosmosChangeFeedRequestOptions options);
Wire them in
CosmosChangeFeedRequestOptions.java(parallel to lines 769-778 of the query options file), and updateChangeFeedPartitionReader.scala:222andChangeFeedQueryImpl.java:124to use the wrapper. Single grep-discoverable symbol across both option types. -
Option B — document the deliberate divergence. Keep the current shape but add a 2-line comment at the Spark call site explaining why this goes through
getImpl()rather than getting its own wrapper, so future engineers don't "fix" the asymmetry by adding the wrapper anyway, and so they understand the surface-shrinking intent.
m3. PR body claims "see in-code comment" — no such comment exists
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: code_quality
PR body: "deliberately no public setter — see in-code comment". Grep finds nothing.
Verified:
$ grep -rn 'deliberately no public\|public setter' \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java \
sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
# (no output)
The rationale for keeping the flag internal is the single most important note for future maintainers; should live in code, not the PR description (which decays).
Suggested fix: Add a 2-line comment on setEmptyPagesAllowed:
/* Intentionally not surfaced on the public CosmosChangeFeedRequestOptions API.
* Caller must verify the iterator handles empty FeedResponse pages without
* retry loops before opting in. */m4. Test gap: surfaceOrSwallowNoChangesPage is invoked in BOTH branches, only streaming branch tested
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java
Category: testing
The new helper is called from both branches of nextPageInternal:
completeAfterAllCurrentChangesRetrieved || endLSN != nullbranch (line 140)- streaming branch (lines 152, 165)
All 8 tests configure endLSN=null. Spark's ChangeFeedPartitionReader.scala:212-215 DOES set endLsn for bounded partitions, then unconditionally sets setEmptyPagesAllowed(true) — so the (endLSN != null, emptyPagesAllowed=true) combination is a real production scenario with zero unit coverage.
Verified:
$ grep -n "endLSN\|completeAfterAllCurrentChangesRetrieved" sdk/cosmos/azure-cosmos-tests/.../ChangeFeedFetcherEmptyPagesTest.java
339: /* completeAfterAllCurrentChangesRetrieved */ false,
341: /* endLSN */ null,
Suggested fix: Add 2 tests:
nextPage_endLsnSet_emptyPagesAllowedTrue_surfacesNoChangesUntilHasFetchedAllChanges()nextPage_endLsnSet_emptyPagesAllowedFalse_swallowsNoChangesViaRepeatWhenEmpty()(legacy-mode regression guard)
m5. >4*(size+1) consecutive-304 defense only mock-tested at the fetcher layer
File: sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherEmptyPagesTest.java:165-211
Category: testing
Every nextPage_* test mocks handleChangeFeedNotModified directly. The PR's defense-in-depth argument rests on the real FeedRangeCompositeContinuationImpl returning NO_RETRY after the >4*(size+1) threshold. Mock-based contract tests verify "IF NO_RETRY THEN terminate" but not "the real impl DOES return NO_RETRY after threshold".
Verified:
$ grep -n "when(continuation.handleChangeFeedNotModified" .../ChangeFeedFetcherEmptyPagesTest.java
131, 171, 219, 256: (all mocked, no integration with real impl)
Suggested fix: One integration-style test (still unit group, no real Cosmos client): wire a real FeedRangeCompositeContinuationImpl with 2-3 sub-ranges, feed >4*(size+1) consecutive 304 responses, assert shouldFetchMore() flips to false. Catches contract drift between ChangeFeedFetcher and FeedRangeCompositeContinuationImpl.
m6. Spark CHANGELOGs don't disclose per-page-overhead trade-off
Files: sdk/cosmos/azure-cosmos-spark_3-*_2-1*/CHANGELOG.md
Category: perf / agentic disclosure
All 4 Spark CHANGELOG entries announce the fix but don't document the per-page trade-off: workloads with many empty pages will now produce one Spark iterator callback per surfaced page (vs. previously one callback could drain many). Observability dashboards will see "pages per task" metrics spike; LLM cost-attribution tools will misattribute.
Verified:
$ grep -i 'emptyPagesAllowed\|allow.empty.page' sdk/cosmos/azure-cosmos-spark_3-*_*/CHANGELOG.md
# all 4 entries describe the fix but not the surfacing trade-off
Suggested fix: Append to each Spark CHANGELOG entry:
"Note: workloads with many empty pages will now observe one Spark iterator callback per surfaced page where previously a single callback could drain many — this is the intended trade-off for per-page timeout enforcement."
m7. New flag not reachable from public API — forward-looking design statement missing
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java:194-197
Category: agentic
Opt-in flag is impl-class-only; a future Flink connector, MCP server, or 3rd-party agent that wants the same per-page timeout enforcement must depend on com.azure.cosmos.implementation bridge accessors.
Suggested fix: Either (a) graduate to a public @Beta method with doc-comment warning callers must handle empty FeedResponse pages, OR (b) explicitly document the forward-looking decision ("permanently internal — Spark connector only") in a code comment so future Cosmos team and 3rd-party consumers know which way the wind blows. (Overlaps with m3 — both fixes can ship in one code comment.)
m8. New surfaceOrSwallow decision invisible to logs/diagnostics/OpenTelemetry
File: sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java:183-189
Category: agentic
The new surfaceOrSwallowNoChangesPage decision and the new NO_RETRY-on-noChanges branch are invisible to logs/diagnostics. An LLM diagnostician investigating "how many physical pages did this query drain" has no signal beyond the page count itself.
Suggested fix: Add structured trace:
if (logger.isTraceEnabled()) {
logger.trace("change_feed.empty_page surfacing={} no_retry={} continuation={}",
this.emptyPagesAllowed,
retryResult == ShouldRetryResult.noRetry(),
getContinuationForLogging());
}Structured key=value form per observability convention.
Nits
n1. nextPageInternal complexity at advisory boundary
File: ChangeFeedFetcher.java:121-176
Category: code_quality
56 lines, 4-deep nesting; the load-bearing noChanges(r) && this.emptyPagesAllowed guard is the 4th nested check. Dense comments mitigate but still right at the advisory boundary.
Suggested (optional): Extract the .flatMap((r) -> { ... }) body into a private applyNoChangesDecision(FeedResponse<T> r) method.
n2. Pre-existing no-op in CosmosEncryptionAsyncContainer:1158
Category: code_quality | Severity: nit (pre-existing, out-of-scope for this PR)
getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions); discards the return value (the method has no side effects on its receiver). Encryption customers don't get even the partial copy-list propagation. Out of scope — file a follow-up.
n3. Test uses reflection where direct package access would work
File: ChangeFeedFetcherEmptyPagesTest.java:639-646
Category: testing
Test class and Fetcher.isFullyDrained live in the same package (com.azure.cosmos.implementation.query); reflection isn't needed. Reflection-based tests don't fail at compile time on rename refactors.
Suggested fix: Replace reflection with a tiny subclass:
class TestableChangeFeedFetcher extends ChangeFeedFetcher {
boolean callIsFullyDrained(FeedResponse<Document> r) { return isFullyDrained(true, r); }
}n4. No emulator / E2E test reproduces the actual OperationCancelledException scenario
Category: testing | Tag: unverified-pattern-suggestion
Coverage is unit-mock only. Granular mock coverage is appropriate primary defense, but an emulator test would lock in the fix at the integration boundary.
Verified:
$ grep -rln 'OperationCancelled\|end-to-end timeout\|endToEndTimeout\|emptyPagesAllowed' sdk/cosmos/azure-cosmos-tests/src/test
# only the two new files added in this PR
Suggested follow-up (not for this PR): @Test(groups = { "emulator" }) that creates a sparse multi-partition container with aggressive end-to-end-latency policy and asserts byPage().iterator() change-feed read completes without OCE.
Design alignment (citations to design docs)
cosmos-pr-reviewer raised 9 design-alignment findings, almost all of which are propose-design-doc-extension items.
D1. disableShouldFetchMore() bypasses Fetcher.update() top/maxItemCount machinery
File: ChangeFeedFetcher.java:160-169 | Citation: 12-change-feed.md#key-classes
Direct disableShouldFetchMore() call sidesteps the documented Fetcher base-class state machine. Benign because top is always -1 for change feed, but the bypass deserves an inline comment acknowledging the safety argument.
D2. surfaceOrSwallowNoChangesPage helper conflates snapshot and streaming mode semantics
File: ChangeFeedFetcher.java:183-189 | Citation: 12-change-feed.md#key-classes
DRYing the two original call sites risks silently coupling their swallow-vs-surface semantics. Suggested fix: add a comment stating the helper applies to both modes intentionally.
D3-D9. Propose-design-doc-extension items (for the cosmosdb-design-docs repo, NOT this SDK PR)
| ID | Title | Target chapter | Proposed heading |
|---|---|---|---|
| D3 | Incremental change-feed termination contract undocumented | 12-change-feed.md |
## Incremental Change Feed — Termination Contract |
| D4 | >4*(size+1) consecutive-304 defense bail rationale lives only in code |
12-change-feed.md |
append to ### Scenario 4: Breadth-First 304 Cycling |
| D5 | emptyPagesAllowed SDK contract surface (swallow vs surface) undocumented |
12-change-feed.md + cross-ref 07-query-engine.md |
## Empty-Page Handling — Swallow vs Surface |
| D6 | 07-query-engine.md silent on SDK-side ParallelDocumentQueryExecutionContext |
07-query-engine.md |
## SDK-Side Query Execution Contexts |
| D7 | withCosmosPagedFluxOptions continuation-rebuild field drift is a design rule |
12-change-feed.md |
append ### Continuation-Token Round-Trip Field Drift |
| D8 | 17-status-codes-and-sdk-retries.md table missing change-feed 304 semantics cross-ref |
17-status-codes-and-sdk-retries.md |
row 304 entry — add change-feed cross-ref |
| D9 | Merge-amplification of empty pages under emptyPagesAllowed=true not in Scenario 1 |
12-change-feed.md#scenario-1-duplicate-processing-after-merge |
extend impact bullets |
All 9 are non-blocking for this SDK PR. Treat as follow-up issues on the design-docs repo.
Cross-SDK informational divergences
These are not findings to act on, but the user (and any future cross-SDK consistency reviewer) should know:
| Behavior | Java (post-PR) | .NET | Python | Note |
|---|---|---|---|---|
| Default 304 handling on cross-partition CF | repeatWhenEmpty (unbounded — pre-existing) |
Aggregate per cycle, surface one NotModifiedPage |
while True bounded by should_retry_on_not_modified_response() |
Java still divergent in default mode — intentional back-compat preservation |
| Opt-in to surface per-page | emptyPagesAllowed=true (internal-only) |
Not exposed (no equivalent flag) | Not exposed (no equivalent kwarg) | Verified by grep — zero hits in both peers |
| Per-operation end-to-end timeout config | CosmosEndToEndOperationLatencyPolicyConfig |
Not present (only per-request AvailabilityStrategy) |
Not present | This is why .NET/Python don't see the timeout bug |
isFullyDrained-style termination signal |
Yes (Java-specific) | No (uses cycle-bound IsNextRangeEqualToOriginal) |
No (uses _initial_no_result_range cycle tracking) |
The Java regression class has no peer analog |
| Empty cycle UX | Continues polling (Java) | Surfaces NotModifiedPage per cycle |
Terminates iterator entirely (raise StopIteration) |
Materially different — multi-SDK customers will observe different paging shapes |
Cross SDK Reviewer's verdict: Approve. There is no peer SDK to copy the design from; Java's choices are internally consistent with its own pre-existing query-side setAllowEmptyPages and deliberately divergent from peers in a way peers cannot easily match without first shipping a per-operation timeout config.
Verified-negative findings (rigor receipts)
Preserved verbatim — these prove what was checked and explicitly ruled out:
- ❌ "Naming inconsistency
emptyPagesAllowedvsallowEmptyPagesat field/impl layer" — DROPPED. Field name + impl methods exactly match siblingCosmosQueryRequestOptionsImpl(verified by grep on line 25, 308, 317). - ❌ "
TransientIOErrorsRetryingIteratorSpec.scalaonly runs againstazure-cosmos-spark_3and not the 4 publishable variants" — DROPPED. All 4 variantpom.xmls reference<source>${basedir}/../azure-cosmos-spark_3/src/test/scala</source>(shared test source dir; auto-runs against all variants). - ❌ "Coverage gap for
>4*(size+1)defense inFeedRangeCompositeContinuationImpl" — Partially dropped. The NO_RETRY contract IS mock-tested at the fetcher layer (nextPage_emptyPagesAllowedTrueWithNoRetryOnNoChanges_terminatesIteration, lines 442-489). Finding m5 above re-raises the integration-against-real-impl gap, which is a complementary concern. - ❌ "
isFullyDraineddefault-mode regression coverage missing" — DROPPED. TestisFullyDrained_noChangesResponseWithEmptyPagesAllowedFalse_returnsTrue(line 379) exists and explicitly verifies short-circuit fires without consulting the continuation (Mockito.verify(continuation, Mockito.never()).isDone()— load-bearing pin). - ❌ "Behavior change in
(completeAfterAllCurrentChangesRetrieved || endLSN != null)branch foremptyPagesAllowed=falsecallers" — DROPPED. Comparedgit show HEAD~6:.../ChangeFeedFetcher.javato current: bytecode-equivalent for the legacy path (reEnableShouldFetchMoreForRetry(); return Mono.empty();). Behavior preservation verified.
What we explicitly did NOT flag
- PR Deep Reviewer confirmed (via 10/10 ✅ table) every claim in the PR body about:
isDone()permanently false,handleChangeFeedNotModifiedNO_RETRY-for-data, only-one-caller verification onPaginator.getChangeFeedQueryResultAsObservableandnew ChangeFeedFetcher(...), copy constructor includes new field,disableShouldFetchMorecorrectly stopsPaginator, refactor behavior-preserving for all 4 (branch × flag) combinations. - Existing
copilot-pull-request-reviewerinline comments (3) — all addressed by author in commit08f690b, dropped to avoid duplication. - Multi-Spark-variant test coverage — verified shared source dir; no per-variant duplication needed.
- Bridge accessor pattern as language-idiomatic — verified parity with .NET
InternalsVisibleTo(lists 20+ friend assemblies inAssemblyInfo.cs:7-44) and Python_moduleconvention. Java's pattern is parity-aligned in spirit; not over-engineering. isFullyDrainedregression as a Java-specific architecture issue — verified neither .NET nor Python has an analogousIsFullyDrained/IsDone()termination signal; the bug class has no peer analog.
Generated by the PR Review Router (Tier 3 — Deep) — orchestrating PR Deep Reviewer + Engineering Quality Reviewer + cosmos-pr-reviewer + Cross SDK Reviewer in parallel. Posted as a single review comment with explicit user approval after a human review pass.
|
✅ Review complete (26:15) Posted 3 inline comment(s). Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…, polish Major (Kushagra): * M1 — Update PR description to reflect the post-commit-34d3da4 accessor shape (getImpl() escape hatch was being misrepresented as a named accessor wrapper). * M2 — Invert the default in CosmosChangeFeedRequestOptions .withCosmosPagedFluxOptions: instead of building a fresh impl from the continuation token and copying back 4 fields, build from the token and inherit ALL non-token-encoded fields via a new CosmosChangeFeedRequestOptionsImpl.inheritNonContinuationFieldsFrom helper. This closes silent drift for endLSN, customSerializer, excludeRegions, readConsistencyStrategy, thresholds, customOptions, operationContextAndListenerTuple, keywordIdentifiers, completeAfterAllCurrentChangesRetrieved, quotaInfoEnabled, isSplitHandlingDisabled, partitionKeyDefinition, and collectionRid. The 4 token-encoded fields (continuationState, feedRangeInternal, mode, startFromInternal) remain authoritative from the parsed token. Single maintenance point for any future field. * M3 — Add timeOut = 10_000 to the 4 nextPage_* tests in ChangeFeedFetcherEmptyPagesTest so a regression that reintroduces unbounded repeatWhenEmpty drain fails fast instead of looking like CI flake (.NET parity). Minor (Kushagra + Annie): * m1+m3+m7 — Combined javadoc on CosmosChangeFeedRequestOptionsImpl .setEmptyPagesAllowed/isEmptyPagesAllowed: explains default, paging semantics impact, and the deliberate 'not surfaced on public API' decision. Replaces the PR-body claim with an in-code source of truth. * m2 = Annie Azure#2 — Re-add the bridge accessor wrappers setAllowEmptyPages/getAllowEmptyPages on CosmosChangeFeedRequestOptionsAccessor mirroring the query-side pattern. Restores grep-discoverability and reduces refactor blast radius. The public CosmosChangeFeedRequestOptions API is unchanged (no public setter); friend-API surface only. * m4 = Annie #1 — Add 2 nextPage_endLsnSet_emptyPagesAllowed_* tests exercising branch 1 of nextPageInternal (the completeAfterAllCurrentChangesRetrieved || endLSN != null path), which is the production path Spark's ChangeFeedPartitionReader hits for bounded snapshot reads. * m5 — Add emulator-group end-to-end test CosmosContainerChangeFeedTest.changeFeedQuery_emptyPagesAllowed_ surfacesNoChangesPagesAndTerminates exercising the real FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304 defense path that mock-based unit tests can't reach (the impl class is package-private by design). * m6 — Shorten the verbose Spark + azure-cosmos CHANGELOG entries and append a brief 'one iterator callback per empty page' trade-off note for operator observability. * Annie Azure#3 = M2 — broader drift hazard closed via the inheritNonContinuationFieldsFrom approach above. Nits: * n1 — Extract the nextPageInternal flatMap body into a private applyNoChangesDecision(FeedResponse<T>) method. Reduces nesting depth from 4 to 2 and improves readability of the contract. * n3 — Replace reflective Fetcher.isFullyDrained invocation with a direct call. The test lives in com.azure.cosmos.implementation.query, same package as Fetcher, so protected access works without reflection. Tests: * ChangeFeedFetcherEmptyPagesTest: 10 tests (was 8), all green * CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest: 6 tests (was 3 — adds endLSN, customSerializer, negative pin), all green * FetcherTest: 5/5 (no regression) * TransientIOErrorsRetryingIteratorSpec: 7/7 * CosmosContainerChangeFeedTest.changeFeedQuery_emptyPagesAllowed_*: new emulator-group test (compiles; runs in CI Test Emulator lane) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The test failed intermittently on macOS / slow CI runners with
'count > 2' failing at count == 2. Root causes:
* .subscribe() is fire-and-forget; Thread.sleep(3000) was the only
synchronization, hoping 3s was enough for async pages to arrive
* Race between the two subscriptions in FULL_FIDELITY mode: line 367
read continuation.get() synchronously without waiting for the first
subscription to populate it (could feed '' to
createForProcessingFromContinuation)
* Slow runners need more than 3s to receive 3 pages of 100 docs at
maxItemCount=10
Fix:
* Replace each .subscribe() + Thread.sleep(3000) pattern with a
CountDownLatch(N) for 'N pages received' + a generous 30s timeout.
Deterministic ordering, no fixed sleep.
* For the bounded .take(2, true) block, switch from fire-and-forget
.subscribe() to .blockLast(Duration.ofSeconds(30)) so the test
waits for the pipeline to complete after exactly 2 pages.
* Dispose subscriptions in finally blocks to avoid leaking pages
between test iterations.
Test intent preserved: count > 2 on the first/resume subscriptions,
count == 2 on the bounded take(2, true) subscription.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| pagedFluxOptions.getRequestContinuation()); | ||
| effectiveRequestOptions.setMaxPrefetchPageCount(this.getMaxPrefetchPageCount()); | ||
| effectiveRequestOptions.setThroughputControlGroupName(this.getThroughputControlGroupName()); | ||
| effectiveRequestOptions.getImpl().inheritNonContinuationFieldsFrom(this.actualRequestOptions); |
There was a problem hiding this comment.
does this mean we had a bug for changeFeed previously where we may not honor exclude regions etc?
There was a problem hiding this comment.
Good question — yes, this rebuild path had a latent bug, but only on a niche entry point. Tracing the callers:
- The internal change-feed pipeline goes through
ChangeFeedOperationStateand the LROUtils.getEffectiveCosmosChangeFeedRequestOptionspaths — both pass aCosmosPagedFluxOptionswhoserequestContinuationis null on the first subscribe, so theif (pagedFluxOptions.getRequestContinuation() != null)branch is skipped andeffectiveRequestOptions = this(the original). No field loss. - The Spark connector builds fresh
CosmosPagedFluxinstances per partition and reads viaiterator()— never hits the rebuild branch. - The only public path that hits the rebuild branch is
container.queryChangeFeed(opts).byPage(savedContinuationToken)(i.e. a caller chooses to resume from a previously-captured token).
Under the old code, that path silently dropped: endLSN, completeAfterAllCurrentChangesRetrieved, excludeRegions, readConsistencyStrategy, diagnosticsThresholds, customSerializer, keywordIdentifiers, maxPrefetchPageCount, throughputControlGroupName, and quotaInfoEnabled — anything not encoded in the continuation token. So yes, a caller resuming a change-feed read via byPage(savedToken) with excludeRegions configured would have silently lost the region pinning until the new page.
The inheritance fix here closes it for free (and the new CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest cases pin it).
Previous de-flake (commit 5683b08) reordered the FULL_FIDELITY insert and the first subscription such that the insert ran BEFORE subscribing. But FULL_FIDELITY uses createForProcessingFromNow, so docs written before the subscription opens are invisible — the first subscription saw zero pages and the new CountDownLatch(3) timed out at 30s. Fix: branch by mode. INCREMENTAL keeps the pre-subscribe insert (since createForProcessingFromBeginning sees pre-existing docs). FULL_FIDELITY inserts AFTER each subscribe (first subscription, resume-from-continuation subscription, and the bounded take(2,true) subscription) so the from-now pipeline actually has writes to consume. Caught by: CosmosContainerChangeFeedTest.asyncChangeFeedPrefetching:385 [first change-feed subscription should produce at least 3 pages within 30 seconds] on Windows TCP Java8 + Java17 emulator runs, FULL_FIDELITY parameter only; INCREMENTAL passed on all platforms. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Tvaron3/spark allow empty pages
…dd spark 4.x changelogs - Renamed change-feed-side flag emptyPagesAllowed -> notModifiedPagesAllowed (and bridge accessor methods setAllowEmptyPages/getAllowEmptyPages -> setAllowNotModifiedPages/getAllowNotModifiedPages) so the name reflects what it actually controls: 304/NotModified pages from sub-partitions. Query-side CosmosQueryRequestOptions.setAllowEmptyPages stays unchanged. Test file renamed via git mv (history preserved). Closes Annie's naming feedback. - Updated ChangeFeedFetcherNotModifiedPagesTest.isFullyDrained_...returnsTrue assertion to match xinlian12's merged simpler isFullyDrained (unconditional noChanges -> true). Was failing CI build 6368298 with 'Expecting value to be false but was true' across all NotFromSource_TestsOnly + EmulatorTCP jobs. - Added missing PR 49276 bugfix entries to azure-cosmos-spark_4-0_2-13 and azure-cosmos-spark_4-1_2-13 CHANGELOGs (both ship the shared azure-cosmos-spark_3 code so the fix lands there). - Stripped IcM 51000001033272 references from test docstrings (per Annie - PR link in CHANGELOG is sufficient internal traceability). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CI build 6368554 failed only on windows2022_EmulatorOnlyIntegrationTestsTcpJava{8,17}
with 'first change-feed subscription should produce at least 3 pages within 30
seconds' on both INCREMENTAL and FULL_FIDELITY parameter sets.
Root cause: the test's TestNG method timeOut was TIMEOUT (40s), but the
deterministic de-flake from a prior session uses 30s firstLatch.await +
another 30s for the FF resume phase + bounded take(.blockLast(30s)). On the
slow Windows EmulatorTcp runner the sum routinely exceeds the method timeout,
and even within each phase the page-arrival cadence sometimes can't deliver
3 pages in 30s.
Fix:
- timeOut = TIMEOUT * 5 (200s) — matches sibling emulator tests on lines 200,
1121, 1167 that also need the longer budget.
- awaitSeconds = 60L — doubles the per-phase wait window so the slow runner
has room to deliver pages.
- retryAnalyzer = FlakyTestRetryAnalyzer.class — consistent with the
changeFeedQueryEndLSNHang test on line 1070; absorbs residual jitter.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…d retry The deterministic CountDownLatch + .blockLast() de-flake I introduced earlier in the session over-corrected: while it eliminates non-determinism in principle, it traded subtle race jitter for slow-runner sensitivity that became a hard failure on Windows EmulatorTcp Java 8 (build 6368807 still failed all 3 retries even with TIMEOUT * 5 and 60s per-phase awaits). The original Thread.sleep(3000) shape had been passing in CI for months and predates this PR entirely — the test is exercising Reactor's byPage prefetch behavior on the change-feed stream, which is unrelated to the notModifiedPagesAllowed work this PR ships. Reverting to that proven shape is the lowest-risk path; retryAnalyzer = FlakyTestRetryAnalyzer is the only addition (consistent with sibling change-feed tests) to absorb the residual slow-runner jitter that remains in the original. Removed the now-unused CountDownLatch + TimeUnit imports. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The 3-line comment was xinlian12's reasoning note explaining the need for the setFeedResponseContinuationToken call directly below it. The token re-stamp already addresses the concern (re-stamps with this.changeFeedState.toString() so the surfaced empty page carries the post-rotation cursor), so the speculative wording reads as a stale TODO. Removed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
/azp run java - cosmos - spark |
|
Azure Pipelines will not run the associated pipelines, because the pull request was updated after the run command was issued. Review the pull request again and issue a new run command. |
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
|
/azp run java - cosmos - spark |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Summary
Spark partition tasks reading change feed (or executing cross-partition queries) against a sparse workload could hit
OperationCancelledException("End-to-end timeout hit when trying to retrieve the next page") at the connector's 65-second per-operation end-to-end timeout.Root cause: with the default
emptyPagesAllowed=false,ParallelDocumentQueryExecutionContextandChangeFeedFetcherswallow empty / 304 pages internally — a single producer-sidenextPage()call can keep draining many sub-feedRanges before emitting one non-empty page. For sparse cross-partition workloads the cumulative wall time blows the per-operation timeout.Fix
Spark
ItemsPartitionReader(query path) callssetAllowEmptyPages(true)onCosmosQueryRequestOptionsso the SDK's existing emptyPagesAllowed plumbing applies — each physical page surfaces independently and is bound by the per-page timeout window rather than the per-operation one.New internal-only
emptyPagesAllowedflag onCosmosChangeFeedRequestOptionsImpl(defaultfalse— behavior unchanged for all other callers). Plumbed throughPaginator.getChangeFeedQueryResultAsObservableintoChangeFeedFetcher.nextPageInternal. When the flag is true, both 304/noChangesbranches returnMono.just(r)so empty pages bubble up to the iterator. The flag is reached via the existingaccessor.getImpl(options).setEmptyPagesAllowed(...)escape hatch — no new bridge accessor wrappers are added (see commit34d3da4for the rationale on keeping the accessor surface small for internal-only flags).ChangeFeedFetcher.isFullyDrainedcorrectness fix. The original cleanup (consult onlycontinuation.isDone()) regressed every non-Spark caller becauseFeedRangeCompositeContinuationImpl.isDone()returnscompositeContinuationTokens.size() == 0, which is permanentlyfalsefor incremental change feed (moveToNextToken()rotates the deque, never shrinks it). ThenoChanges → trueshort-circuit is now gated on!emptyPagesAllowed, preserving the original termination signal for every default-mode caller while still letting Spark surface empty pages.Defense-in-depth
NO_RETRYarm. WithemptyPagesAllowed=trueand a streaming change feed (noendLSN), ifhandleChangeFeedNotModifiedreturnsNO_RETRYon a noChanges page (single-partition case, multi-partition full-cycle complete, or the>4*(size+1)consecutive-304 defense inFeedRangeCompositeContinuationImpl), the SDK's built-in termination signal would otherwise be silently dropped.nextPageInternalnow explicitly callsdisableShouldFetchMore()when the SDK signals NO_RETRY on a noChanges page so the defense-in-depth termination guarantee survives for the new flag too. ThenoChanges(r) && emptyPagesAllowedguard is load-bearing becausehandleChangeFeedNotModifiedreturnsNO_RETRYfor every data page too — without the guard, every data page would silently truncate iteration.Spark
ChangeFeedPartitionReaderopts into the new flag viaaccessor.getImpl(options).setEmptyPagesAllowed(true).CosmosChangeFeedRequestOptions.withCosmosPagedFluxOptionsalso propagatesemptyPagesAllowedwhen the paged-flux pull mechanism supplies a continuation token (the freshly-built impl would otherwise silently lose the flag — added a comment flagging the broader drift hazard in that copy list).DRY: extracted the two near-identical
surface or swallow via repeatWhenEmptyblocks into asurfaceOrSwallowNoChangesPage(r)helper.Behavior matrix
nextPage()blocks > 65 s, throws OCEemptyPagesAlloweddefaults tofalse)