Skip to content

stream: optimize gather hot path and polish for upstream#2848

Draft
He-Pin wants to merge 6 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage-v2
Draft

stream: optimize gather hot path and polish for upstream#2848
He-Pin wants to merge 6 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage-v2

Conversation

@He-Pin
Copy link
Copy Markdown
Member

@He-Pin He-Pin commented Apr 6, 2026

Summary

This PR applies deep code review and optimizations on top of the gather operator implementation. All changes are targeted at making the contribution meet Apache Pekko CONTRIBUTING standards.

Hot path optimizations (Ops.scala)

  • Eliminate hasCallbackFirst boolean field — use callbackFirst eq null as sentinel, saving 2 writes + 1 read per element
  • Inline pushCallbackSingle into onPushSingle — remove 1 virtual dispatch per element on the common path
  • Extract cold afterPushFinalAction() — so the JIT optimises the hot branch independently
  • Remove hadContext parameter from maybeRunFinalAction/pushPending* — check needInvokeOnCompleteCallback directly
  • Reset multiMode = false after overflow is fully drained, so subsequent single-output calls return to the fast path

Documentation & API polish

  • Fix @since version: unify to 1.3.0 across all DSL files (was inconsistently 2.0.0)
  • Fix Java docs examples: use collector.push() instead of collector.apply() (idiomatic API)
  • Rewrite gather.md to remove internal PR references and add third example (distinctUntilChanged)
  • Improve Scala/Java docs examples with bufferUntilChanged and distinctUntilChanged patterns
  • Remove internal Chinese evaluation doc (not appropriate for upstream)

Test & build fixes

  • Fix distinctUntilChanged test: remove dead lastElement = Some(elem) in the duplicate case
  • Fix missing Keep and Flow imports in FlowGatherSpec
  • Use explicit imports in ZipWithIndexBenchmark (avoid wildcard import)

Test plan

  • All 30 FlowGatherSpec tests pass
  • Java gather tests pass
  • stream/compile succeeds
  • Commit message follows Apache Pekko convention (stream: ...)

He-Pin and others added 6 commits March 29, 2026 23:46
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage.

Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review.

Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: follow-up review and documentation work for the new gather operator.

Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling.

Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Eliminate `hasCallbackFirst` boolean field — use `callbackFirst eq null`
  as sentinel (saves 2 writes + 1 read per element on the hot path)
- Inline `pushCallbackSingle` into `onPushSingle` — remove 1 virtual
  dispatch per element
- Extract cold `afterPushFinalAction()` so the JIT optimises the hot
  branch independently
- Remove `hadContext` parameter from `maybeRunFinalAction`/`pushPending*`
  — check `needInvokeOnCompleteCallback` directly
- Reset `multiMode = false` after overflow is fully drained so subsequent
  single-output calls return to the fast path
- Fix `@since` version: unify to 1.3.0 across all DSL files
- Fix Java docs examples: use `collector.push()` instead of
  `collector.apply()` (idiomatic API usage)
- Fix distinctUntilChanged test: remove dead `lastElement = Some(elem)`
  in the duplicate case
- Remove `hasPending` helper — inline to `hasPendingFirst` (was only
  called in 2 places, both already check the field directly)
- Use explicit imports in ZipWithIndexBenchmark (avoid wildcard import)
- Fix missing `Keep` and `Flow` imports in FlowGatherSpec
- Rewrite gather.md documentation to remove internal PR references and
  add third example (distinctUntilChanged)
- Improve Scala/Java docs examples with `bufferUntilChanged` and
  `distinctUntilChanged` patterns
- Remove internal Chinese evaluation doc (not appropriate for upstream)

🤖 Generated with [Qoder][https://qoder.com]
- Add Gatherers.oneToOne() factory methods for Java DSL hot path access
- Fix singleCollector.push to correctly handle 3+ outputs per gather call
- Null out pendingOverflow on restart to prevent memory accumulation
- Add null check on factory result to catch invalid factories early
- Expand SubFlow/SubSource gather documentation for Java DSL
- Align Scala/Java DSL documentation language
- Add tests: materialization independence, empty upstream,
  onComplete null emission, multi-output backpressure

🤖 Generated with [Qoder](https://qoder.com)
@He-Pin He-Pin marked this pull request as draft April 6, 2026 20:16
*
* '''Cancels when''' downstream cancels
*
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect change

* The collector is only valid while the current [[Gatherer]] callback is running.
* Emitted elements MUST NOT be `null`.
*
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.0.0

* A new gatherer instance is created for each materialization and on each supervision restart.
* It can keep mutable state in fields or via captured variables.
*
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.0.0

* the `onComplete` callback.
*
* @param f the one-to-one transformation function
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the 1.3.0 in this class or elsewhere in this PR should be 2.0.0

/**
* Create a `Source` from an `Optional` value, emitting the value if it is present.
*
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect change

*
* '''Cancels when''' downstream cancels
*
* @since 1.3.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these 1.3.0s are wrong - should be 2.0.0

* '''Cancels when''' downstream cancels
*
* @since 1.3.0
* @since 2.0.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incorrect change

* '''Cancels when''' downstream cancels
*
* @since 1.3.0
* @since 2.0.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are all the mapOptions being changed? Change back to 1.3.0

@pjfanning pjfanning added this to the 2.0.0-M2 milestone Apr 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants