Skip to content

Share per-chunk JoinLeftData across right partitions in NLJ memory-limited fallback#22038

Open
viirya wants to merge 4 commits intoapache:mainfrom
viirya:nlj-multi-partition-unmatched-fix
Open

Share per-chunk JoinLeftData across right partitions in NLJ memory-limited fallback#22038
viirya wants to merge 4 commits intoapache:mainfrom
viirya:nlj-multi-partition-unmatched-fix

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented May 6, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

When target_partitions > 1, the memory-limited fallback path was building a per-output-partition JoinLeftData with AtomicUsize::new(1) for each left chunk, so each partition emitted unmatched left rows based only on its own right-side matches. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and LEFT MARK, this produced wrong results (duplicate unmatched rows; for LEFT SEMI, duplicate matched rows when multiple right partitions matched the same left row).

What changes are included in this PR?

This change introduces a plan-level FallbackCoordinator that:

  • Owns the left spill stream and a single chunk-sized MemoryReservation,
  • Has the first partition reaching a chunk become its "leader": it loads the chunk and publishes an Arc<JoinLeftData> (with probe_threads_counter == right_partition_count) into a shared slot,
  • Lets every other right partition take an Arc clone of the same JoinLeftData, so the visited bitmap and probe-thread counter are shared exactly as in the single-pass collect_left_input path,
  • Releases the slot only after the partition that brings the counter to zero finishes emitting unmatched left rows for the chunk, then notifies waiters so the next chunk can be loaded.

The per-chunk in-flight fetch and release are driven through BoxFuture fields on SpillStateActive, polled across poll_next iterations.

The FULL-join multi-partition guard added in #21833 is removed; FULL joins now use the shared coordination path.

Discussed in #21833 (comment) and #21833 (comment).

Are these changes tested?

Added five multi-partition correctness tests. test_overallocation is updated to expect FULL multi-partition to spill (not OOM). Added multi-partition NLJ spill SLT cases.

Are there any user-facing changes?

No

viirya added 4 commits May 5, 2026 17:38
Adds five tests reproducing the cross-partition coordination bug in
NestedLoopJoinExec's memory-limited fallback path: each output
partition independently constructs a per-chunk JoinLeftData with
AtomicUsize::new(1), so the left visited bitmap and probe-thread
counter are not shared across right partitions. For LEFT, FULL,
LEFT SEMI, LEFT ANTI, and LEFT MARK, this causes wrong results when
target_partitions > 1 (duplicate unmatched rows; for LEFT SEMI,
duplicate matched rows when multiple right partitions match the same
left row).

The tests are #[ignore]'d for now (with reason) and re-enabled in a
follow-up commit that implements the shared-state fix. Discussed in
apache#21833 (comment)
and apache#21833 (comment)

Co-authored-by: Claude Code
…ons in NLJ memory-limited fallback

When `target_partitions > 1`, the memory-limited fallback path was building
a per-output-partition `JoinLeftData` with `AtomicUsize::new(1)` for each
left chunk, so each partition emitted unmatched left rows based only on
its own right-side matches. For LEFT, FULL, LEFT SEMI, LEFT ANTI, and
LEFT MARK, this produced wrong results (duplicate unmatched rows; for
LEFT SEMI, duplicate matched rows when multiple right partitions matched
the same left row).

This change introduces a plan-level `FallbackCoordinator` that:
- Owns the left spill stream and a single chunk-sized `MemoryReservation`,
- Has the first partition reaching a chunk become its "leader": it loads
  the chunk and publishes an `Arc<JoinLeftData>` (with
  `probe_threads_counter == right_partition_count`) into a shared slot,
- Lets every other right partition take an `Arc` clone of the same
  `JoinLeftData`, so the visited bitmap and probe-thread counter are
  shared exactly as in the single-pass `collect_left_input` path,
- Releases the slot only after the partition that brings the counter
  to zero finishes emitting unmatched left rows for the chunk, then
  notifies waiters so the next chunk can be loaded.

The per-chunk in-flight fetch and release are driven through `BoxFuture`
fields on `SpillStateActive`, polled across `poll_next` iterations.

The FULL-join multi-partition guard added in apache#21833 is removed; FULL
joins now use the shared coordination path. The five
`#[ignore]`-d multi-partition correctness tests added in the previous
commit are unignored and now pass. `test_overallocation` is updated to
expect FULL multi-partition to spill (not OOM).

Discussed in
apache#21833 (comment)
and
apache#21833 (comment)

Co-authored-by: Claude Code
Adds end-to-end SLT cases under target_partitions=4 + tight
memory_limit, covering LEFT, FULL, LEFT SEMI, and LEFT ANTI joins.
Each query uses a non-equi predicate that forces NLJ and verifies
that left-row counts (matched/unmatched) match the single-partition
expectation, exercising the cross-partition shared-state path
introduced in the previous commit.

Co-authored-by: Claude Code
The previous SLT cases verified output correctness under
target_partitions=4 + tight memory_limit, but did not assert that
the memory-limited fallback path was actually taken. Add an
EXPLAIN ANALYZE assertion that the NestedLoopJoinExec line shows
spill_count=2, confirming both the left-side and right-side spills
fired.

Co-authored-by: Claude Code
@github-actions github-actions Bot added sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels May 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant