Skip to content

[refactor](local shuffle) Move local exchange planning from BE to FE#63366

Open
924060929 wants to merge 1 commit into
masterfrom
fe_local_shuffle_rebase3
Open

[refactor](local shuffle) Move local exchange planning from BE to FE#63366
924060929 wants to merge 1 commit into
masterfrom
fe_local_shuffle_rebase3

Conversation

@924060929
Copy link
Copy Markdown
Contributor

@924060929 924060929 commented May 18, 2026

What problem does this PR solve?

Issue Number: None

Related PR: None

Problem Summary:

Move local exchange (LE) planning from BE's _plan_local_exchange (pipeline build time) to a new FE-side planner. The FE planner mirrors BE semantics, brings several correctness fixes, and is gated by a session variable so the legacy BE path stays available as a fallback.

Core design

  • New AddLocalExchange pass runs after DistributePlanner, walking each fragment's plan tree bottom-up via the polymorphic PlanNode.enforceAndDeriveLocalExchange(). Each node declares what distribution it requires of its children; the framework inserts LocalExchangeNode where needed.
  • LocalExchangeNode represents intra-fragment data redistribution and supports PASSTHROUGH, GLOBAL/LOCAL/BUCKET HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, NOOP.
  • Per-BE instance semantics: maxPerBeInstances (max pipeline instances assigned to any single BE) is used instead of global instance count to match BE's _num_instances check. Planning is a no-op when maxPerBeInstances == 1.
  • Serial → non-serial fan-out: when a serial operator feeds a non-serial parent without an intermediate LE, the framework inserts a PASSTHROUGH LE to restore N-task parallelism, matching BE's required_data_distribution() rule.
  • Requirement-based exchange type resolution via LocalExchangeTypeRequire: RequireHash adapts to any hash flavour, RequireSpecific preserves the exact requested type.

AggregationNode correctness fixes (DORIS-25413)

PR #62438 introduced a semantic split for required_data_distribution=HASH (correctness-required vs performance-only). BE's !_needs_finalize && !enable_local_exchange_before_agg → base early-return conflates both intents in AggSinkOperatorX and DistinctStreamingAggOperatorX, wrongly catching FIRST_MERGE (correctness) / non-streaming dedup (correctness) and producing PASSTHROUGH-over-serial-child → wrong aggregation results. The FE planner adds the missing !isMerge() / useStreamingPreagg=true guards so FIRST_MERGE and non-streaming dedup always emit HASH, regardless of the flag. Also adds requiresShuffleForCorrectness() (mirrors BE's is_shuffled_operator()) so SetOperationNode propagates the "downstream depends on hash" flag correctly through chains.

Session variables

Architectural notes

This PR puts the FE planner in the driver's seat for LE insertion but intentionally keeps BE-side machinery as a fallback:

  1. is_serial_operator is still computed on both sides — any future change to BE's per-operator C++ override must be mirrored in FE.
  2. Legacy BE planner (pipeline_fragment_context.cpp::_plan_local_exchange) is preserved and gated by runtime_state.h::plan_local_shuffle(); the two paths are mutually exclusive.
  3. _propagate_local_exchange_num_tasks is kept as a runtime safety net for paired-pipeline num_tasks mismatches.

Build fixes (cross-toolchain portability)

  • multi_version.h: replace atomic_load/atomic_store (deprecated in libstdc++ C++20 / LLVM 20) with std::shared_mutex-based RW locking.
  • memory.cpp: fix std::max type mismatch (long vs int64_t) on macOS.
  • bucketed_aggregation_sink_operator.h: fix ExchangeType::NOOPTLocalPartitionType::NOOP after thrift enum rename.

Release note

Add session variable enable_local_shuffle_planner (default true) to control whether local exchange nodes are planned in FE (new path) or in BE (legacy _plan_local_exchange). The two paths are mutually exclusive; the legacy path remains intact behind this flag.

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes. Local exchange node insertion now happens at FE planning time when enable_local_shuffle_planner=true (default). Plan shapes (LOCAL_EXCHANGE_NODE in TPlanNode) and exchange counts may differ from the legacy BE-planned path, but query results remain equivalent. Setting enable_local_shuffle_planner=false restores the legacy behavior bit-for-bit.
  • Does this need documentation?

    • No.
    • Yes. Session variable enable_local_shuffle_planner should be added to the documentation; doc PR will be filed separately.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@924060929
Copy link
Copy Markdown
Contributor Author

/review

@924060929 924060929 force-pushed the fe_local_shuffle_rebase3 branch from 2e22e0a to 6fa1901 Compare May 18, 2026 11:56
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found two blocking issues in the FE local exchange planner path. Critical checkpoints: goal/test coverage: the PR implements FE-side local exchange planning with substantial unit/regression coverage, but the new default path is not safe for mixed FE/BE versions and has a correctness gap in serial-source gating. Scope: the change is focused on local exchange planning, though it is broad and cross-module. Concurrency/lifecycle: no direct new shared Java concurrency issue found; BE deferred local exchanger lifecycle appears intentionally wired before pipeline prepare. Configuration/compatibility: enable_local_shuffle_planner defaults to true and sends new thrift plan nodes/types without a visible BE-version gate, which is a rolling-upgrade blocker. Parallel paths: the old coordinator is forced to BE planning, but the Nereids FE path still needs the compatibility and serial-gating fixes. Testing: good coverage was added, but it does not cover mixed-version execution or the non-serial-source fragment cases noted inline. User focus: no additional user-provided focus points.

description = {"是否在FE规划Local Shuffle",
"Whether to plan local shuffle in frontend"}, needForward = true)
private boolean enableLocalShufflePlanner = true;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default enables the FE planner to serialize the new LOCAL_EXCHANGE_NODE/TLocalPartitionType protocol to every BE as soon as the FE is upgraded. During a rolling upgrade, an old BE does not have TPlanNodeType::LOCAL_EXCHANGE_NODE handling in _create_operator and will reject/fail such fragments, while RuntimeState::plan_local_shuffle() also disables the legacy BE planner because enable_local_shuffle_planner is set. Please gate this path on BE capability/version (or keep the default off until all BEs support the new node) so mixed FE/BE deployments continue to execute queries during upgrade.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our deployment process upgrades BE before FE, so during a rolling upgrade an old BE never receives LOCAL_EXCHANGE_NODE from a new FE — the new thrift node type is only sent after all BEs in the cluster have been upgraded. Keeping the default true is intentional so we get the FE planner enabled out of the box once the rollout completes. If you have a deployment topology that flips this order, please set enable_local_shuffle_planner=false in fe.conf or via SQL and the legacy BE path stays bit-for-bit identical to the old behavior.

// 4. Layer 1: skip LE when serial operator or ancestor in same pipeline
// Equivalent to BE's need_to_local_exchange: any_of(operators[idx..end], is_serial) → skip
if (translatorContext.hasSerialAncestorInPipeline(this) || isSerialNode()) {
return childOutput;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This skip uses isSerialNode() even though the comment above isSerialOperatorOnBe() says an isSerialNode() only actually runs with one BE task when fragment.useSerialSource(context) is true. For fragments where useSerialSource is false (for example ignore_storage_data_distribution=false, query cache, or NAAJ), a node such as a scalar aggregate or unpartitioned exchange can still return isSerialNode()==true but BE will execute it with normal parallelism (is_serial_operator=false in thrift). In that case this branch skips a required LocalExchange even though BE would not consider the ancestor serial, so downstream hash/passthrough requirements can be silently dropped. The serial-ancestor propagation at line 1094 has the same issue. Please base these planner decisions on isSerialOperatorOnBe(translatorContext.getConnectContext()), not the syntactic isSerialNode(), except for the explicitly documented heavy-op/local-fragment cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 15d92ba. Both the Layer 1 skip and the serial-ancestor propagation in enforceRequire now use isSerialOperatorOnBe(translatorContext.getConnectContext()) instead of the raw isSerialNode().

Verified that BE's OperatorBase constructs from the Thrift is_serial_operator flag (which FE writes via isSerialOperatorOnBe, not isSerialNode) — so Pipeline::need_to_local_exchange's op->is_serial_operator() check returns false when fragment.useSerialSource(ctx) is false, even if isSerialNode() is true. The previous code would have over-skipped LocalExchange in exactly the scenarios you listed (ignore_storage_data_distribution=false, query cache, NAAJ).

Updated LocalShuffleNodeCoverageTest.testMaterializationNode and testSetOperationAndAssertNumRowsNode to reflect the corrected behavior: in the fragment-less unit-test path isSerialOperatorOnBe returns false (the fragment != null guard) so the framework no longer skips Layer 1 and inserts the required LocalExchange.

Other isSerialNode() call sites in PlanNode.java were audited and left as-is:

  • toThrift() already uses isSerialOperatorOnBe
  • hasSerialChildren() is a pure node-level tree walk used only for fragment-internal heuristics
  • createLocalExchange() heavy-op gate is already inside a fragment.useSerialSource(ctx) branch, so isSerialNode and isSerialOperatorOnBe are equivalent there

Previously, local exchange (LE) nodes were inserted exclusively by the
BE's `_plan_local_exchange` at pipeline build time.  The FE had no
visibility into which operators needed a fan-out or shuffle before
execution, making it impossible to validate, optimize, or override LE
decisions at planning time.

This PR introduces a full FE-side local exchange planner that mirrors
BE semantics, brings several correctness fixes, and leaves the legacy
BE path fully intact behind a feature flag.  See "Current architecture
notes" at the bottom for what the FE planner does and does not own.

A new `AddLocalExchange` pass runs after normal fragment assignment.
It walks each fragment's plan tree bottom-up, calling the polymorphic
`PlanNode.enforceAndDeriveLocalExchange()` on every node.  Nodes
declare what distribution they require of their children; the framework
inserts `LocalExchangeNode` where needed.

`LocalExchangeNode` represents intra-fragment data redistribution and
supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE,
BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST,
PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP.

The pass is guarded by `enable_local_shuffle_planner` (default true).
When disabled, BE continues to run its own `_plan_local_exchange` as
before, keeping the old path fully intact.

`maxPerBeInstances` (max pipeline instances assigned to any single BE)
is used instead of a global `instanceCount`.  Planning is a no-op when
`maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline
would cause task-count mismatches and pipeline starvation.

When a serial operator (e.g. OlapScanNode with a single tablet bucket)
feeds a non-serial parent without an intermediate LE, downstream tasks
starve waiting for data that never arrives.  The framework detects this
case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly
matching BE's `required_data_distribution()` serial → PASSTHROUGH rule.

`LocalExchangeTypeRequire` abstracts two strategies:
- `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE`
  (safe for intra-fragment hash partitioning).
- `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE /
  GLOBAL_EXECUTION_HASH_SHUFFLE without degradation.

PR #62438 added `enable_local_exchange_before_agg`, but its BE guard
`!_needs_finalize && !enable_local_exchange_before_agg → base` conflated
two semantically different cases in AggSink and DistinctStreamingAgg:

- **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg
  (performance-only) and FIRST_MERGE dedup (correctness-critical).
  The flag-gated early-return wrongly skipped HASH for FIRST_MERGE,
  producing PASSTHROUGH-over-serial-child → wrong aggregation results.

- **DistinctStreamingAgg**: `!finalize` covered both streaming preagg
  (`useStreamingPreagg=true`, performance) and non-streaming dedup
  (`useStreamingPreagg=false`, correctness).  Same class of bug.

FE fix:
- AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL
  phases.  FIRST_MERGE always emits HASH regardless of the flag.
- DistinctStreamingAgg: restrict to `useStreamingPreagg=true`.
  Non-streaming dedup always emits HASH.

Also add `requiresShuffleForCorrectness()` to mirror BE's
`is_shuffled_operator()`, so SetOperationNode propagates the
"downstream depends on hash" flag correctly instead of using the
coarser `parentRequire.preferType().isHashShuffle()` check that
over-inserted HASH LE on every union branch under a streaming preagg.

These fixes reduce FE/BE consistency mismatches from 8 to 3
(only pre-existing NLJ optimization differences remain).

- `enable_local_shuffle_planner` — use FE planner (default true)
- `enable_local_shuffle` — master switch for local shuffle
- `enable_local_exchange_before_agg` — HASH LE before non-final agg
  (default true, mirrors #62438)

`validateNoSerialWithoutLocalExchange()` walks the final plan tree and
logs a warning whenever a serial operator feeds a non-serial parent
without an intermediate LocalExchangeNode, catching planning gaps
before execution.

- `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns
  with the flag on and off; covers the FIRST_MERGE and
  DistinctStreamingAgg correctness fixes.
- `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL
  with `enable_local_shuffle_planner=true` and `=false` across the
  full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union,
  TableFunction, AssertNumRows, RQG-derived corner cases) and asserts
  result rows are identical.  Only data correctness is asserted — the
  two planners legitimately differ on the exact exchange counts/types
  they emit, so plan-shape equality is intentionally not checked.
- `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found
  crashes and wrong-result cases.
- `test_old_coordinator_local_shuffle.groovy` — verifies the old
  coordinator path is unaffected.
- `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join
  and aggregation plan shapes.

- `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in
  libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking.
- `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`)
  on macOS.
- `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` →
  `TLocalPartitionType::NOOP` after thrift enum rename.

This PR puts the FE planner in the driver's seat for LE insertion but
intentionally does NOT remove the BE-side machinery — readers should be
aware of three pieces the FE planner shares with or defers to BE:

1. **`is_serial_operator` is computed on both sides.**  FE computes the
   flag and writes it into Thrift, but BE's
   `OperatorBase::is_serial_operator()` is still overridden per operator
   in C++ and used for BE-side runtime decisions.  Any future change to
   the BE override needs to be mirrored on the FE side (and vice versa)
   to keep the planner's view consistent with execution.

2. **The legacy BE planner stays as a fallback.**
   `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved
   and gated by `runtime_state.h::plan_local_shuffle()`: when
   `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as
   before.  The two paths are mutually exclusive, never both running on
   the same query.

3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety
   net.**  The two propagation passes in
   `pipeline_fragment_context.cpp` fix up paired pipelines whose
   `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline
   splits leave a serial Exchange feeding an N-task sink).  FE's
   framework-level serial→non-serial fan-out (`enforceRequire` step 3)
   and the `validateNoSerialWithoutLocalExchange` check aim to make
   these mismatches impossible by construction, but the BE-side fixup
   remains as a defensive guard.

Co-authored-by: Gabriel <liwenqiang@selectdb.com>
@924060929 924060929 force-pushed the fe_local_shuffle_rebase3 branch from 6fa1901 to 15d92ba Compare May 18, 2026 13:21
@924060929
Copy link
Copy Markdown
Contributor Author

run buildall

@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 73.93% (431/583) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

Cloud UT Coverage Report

Increment line coverage 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 78.06% (1854/2375)
Line Coverage 64.52% (33325/51653)
Region Coverage 65.21% (16520/25335)
Branch Coverage 55.70% (8827/15848)

@hello-stephen
Copy link
Copy Markdown
Contributor

TPC-H: Total hot run time: 31734 ms
machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
scripts: https://github.com/apache/doris/tree/master/tools/tpch-tools
Tpch sf100 test result on commit 15d92ba024b265d9441145d7bd26d93590aa9c63, data reload: false

------ Round 1 ----------------------------------
orders	Doris	NULL	NULL	0	0	0	NULL	0	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	17803	3914	3926	3914
q2	q3	10929	1434	801	801
q4	4816	481	340	340
q5	10658	2257	2105	2105
q6	397	175	139	139
q7	914	793	617	617
q8	9633	1832	1667	1667
q9	6923	5130	4985	4985
q10	6518	2090	1770	1770
q11	430	268	239	239
q12	668	423	298	298
q13	18212	3499	2774	2774
q14	260	253	239	239
q15	q16	823	779	721	721
q17	918	940	963	940
q18	7080	5849	6162	5849
q19	1241	1333	1062	1062
q20	503	407	258	258
q21	6216	2696	2699	2696
q22	458	385	320	320
Total cold run time: 105400 ms
Total hot run time: 31734 ms

----- Round 2, with runtime_filter_mode=off -----
orders	Doris	NULL	NULL	150000000	42	6422171781	NULL	22778155	NULL	NULL	2023-12-26 18:27:23	2023-12-26 18:42:55	NULL	utf-8	NULL	NULL	
============================================
q1	4514	4779	4621	4621
q2	q3	4801	5222	4621	4621
q4	2181	2213	1441	1441
q5	4809	4669	4682	4669
q6	232	191	134	134
q7	1807	1659	1400	1400
q8	2178	1885	1875	1875
q9	7379	7481	7388	7388
q10	4474	4422	3979	3979
q11	540	377	348	348
q12	725	726	508	508
q13	2994	3423	2767	2767
q14	270	277	253	253
q15	q16	670	702	606	606
q17	1267	1240	1233	1233
q18	7261	6876	6773	6773
q19	1096	1105	1118	1105
q20	2217	2232	1921	1921
q21	5321	4605	4449	4449
q22	523	482	399	399
Total cold run time: 55259 ms
Total hot run time: 50490 ms

@hello-stephen
Copy link
Copy Markdown
Contributor

BE Regression && UT Coverage Report

Increment line coverage 72.45% (305/421) 🎉

Increment coverage report
Complete coverage report

Category Coverage
Function Coverage 63.48% (23997/37804)
Line Coverage 47.20% (247315/523994)
Region Coverage 44.16% (203214/460125)
Branch Coverage 45.44% (87932/193505)

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 63.93% (468/732) 🎉
Increment coverage report
Complete coverage report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants