stream: optimize gather hot path and polish for upstream#2848
Draft
He-Pin wants to merge 6 commits intoapache:mainfrom
Draft
stream: optimize gather hot path and polish for upstream#2848He-Pin wants to merge 6 commits intoapache:mainfrom
He-Pin wants to merge 6 commits intoapache:mainfrom
Conversation
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage. Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review. Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: follow-up review and documentation work for the new gather operator. Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling. Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Eliminate `hasCallbackFirst` boolean field — use `callbackFirst eq null` as sentinel (saves 2 writes + 1 read per element on the hot path) - Inline `pushCallbackSingle` into `onPushSingle` — remove 1 virtual dispatch per element - Extract cold `afterPushFinalAction()` so the JIT optimises the hot branch independently - Remove `hadContext` parameter from `maybeRunFinalAction`/`pushPending*` — check `needInvokeOnCompleteCallback` directly - Reset `multiMode = false` after overflow is fully drained so subsequent single-output calls return to the fast path - Fix `@since` version: unify to 1.3.0 across all DSL files - Fix Java docs examples: use `collector.push()` instead of `collector.apply()` (idiomatic API usage) - Fix distinctUntilChanged test: remove dead `lastElement = Some(elem)` in the duplicate case - Remove `hasPending` helper — inline to `hasPendingFirst` (was only called in 2 places, both already check the field directly) - Use explicit imports in ZipWithIndexBenchmark (avoid wildcard import) - Fix missing `Keep` and `Flow` imports in FlowGatherSpec - Rewrite gather.md documentation to remove internal PR references and add third example (distinctUntilChanged) - Improve Scala/Java docs examples with `bufferUntilChanged` and `distinctUntilChanged` patterns - Remove internal Chinese evaluation doc (not appropriate for upstream) 🤖 Generated with [Qoder][https://qoder.com]
- Add Gatherers.oneToOne() factory methods for Java DSL hot path access - Fix singleCollector.push to correctly handle 3+ outputs per gather call - Null out pendingOverflow on restart to prevent memory accumulation - Add null check on factory result to catch invalid factories early - Expand SubFlow/SubSource gather documentation for Java DSL - Align Scala/Java DSL documentation language - Add tests: materialization independence, empty upstream, onComplete null emission, multi-output backpressure 🤖 Generated with [Qoder](https://qoder.com)
🤖 Generated with [Qoder](https://qoder.com)
🤖 Generated with [Qoder](https://qoder.com)
pjfanning
requested changes
Apr 7, 2026
| * | ||
| * '''Cancels when''' downstream cancels | ||
| * | ||
| * @since 1.3.0 |
| * The collector is only valid while the current [[Gatherer]] callback is running. | ||
| * Emitted elements MUST NOT be `null`. | ||
| * | ||
| * @since 1.3.0 |
| * A new gatherer instance is created for each materialization and on each supervision restart. | ||
| * It can keep mutable state in fields or via captured variables. | ||
| * | ||
| * @since 1.3.0 |
| * the `onComplete` callback. | ||
| * | ||
| * @param f the one-to-one transformation function | ||
| * @since 1.3.0 |
Member
There was a problem hiding this comment.
all the 1.3.0 in this class or elsewhere in this PR should be 2.0.0
| /** | ||
| * Create a `Source` from an `Optional` value, emitting the value if it is present. | ||
| * | ||
| * @since 1.3.0 |
| * | ||
| * '''Cancels when''' downstream cancels | ||
| * | ||
| * @since 1.3.0 |
Member
There was a problem hiding this comment.
all of these 1.3.0s are wrong - should be 2.0.0
| * '''Cancels when''' downstream cancels | ||
| * | ||
| * @since 1.3.0 | ||
| * @since 2.0.0 |
| * '''Cancels when''' downstream cancels | ||
| * | ||
| * @since 1.3.0 | ||
| * @since 2.0.0 |
Member
There was a problem hiding this comment.
why are all the mapOptions being changed? Change back to 1.3.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR applies deep code review and optimizations on top of the
gatheroperator implementation. All changes are targeted at making the contribution meet Apache Pekko CONTRIBUTING standards.Hot path optimizations (Ops.scala)
hasCallbackFirstboolean field — usecallbackFirst eq nullas sentinel, saving 2 writes + 1 read per elementpushCallbackSingleintoonPushSingle— remove 1 virtual dispatch per element on the common pathafterPushFinalAction()— so the JIT optimises the hot branch independentlyhadContextparameter frommaybeRunFinalAction/pushPending*— checkneedInvokeOnCompleteCallbackdirectlymultiMode = falseafter overflow is fully drained, so subsequent single-output calls return to the fast pathDocumentation & API polish
@sinceversion: unify to1.3.0across all DSL files (was inconsistently2.0.0)collector.push()instead ofcollector.apply()(idiomatic API)gather.mdto remove internal PR references and add third example (distinctUntilChanged)bufferUntilChangedanddistinctUntilChangedpatternsTest & build fixes
distinctUntilChangedtest: remove deadlastElement = Some(elem)in the duplicate caseKeepandFlowimports inFlowGatherSpecZipWithIndexBenchmark(avoid wildcard import)Test plan
stream/compilesucceedsstream: ...)