[refactor](local shuffle) Move local exchange planning from BE to FE#63366
[refactor](local shuffle) Move local exchange planning from BE to FE#63366924060929 wants to merge 1 commit into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
2e22e0a to
6fa1901
Compare
|
run buildall |
There was a problem hiding this comment.
I found two blocking issues in the FE local exchange planner path. Critical checkpoints: goal/test coverage: the PR implements FE-side local exchange planning with substantial unit/regression coverage, but the new default path is not safe for mixed FE/BE versions and has a correctness gap in serial-source gating. Scope: the change is focused on local exchange planning, though it is broad and cross-module. Concurrency/lifecycle: no direct new shared Java concurrency issue found; BE deferred local exchanger lifecycle appears intentionally wired before pipeline prepare. Configuration/compatibility: enable_local_shuffle_planner defaults to true and sends new thrift plan nodes/types without a visible BE-version gate, which is a rolling-upgrade blocker. Parallel paths: the old coordinator is forced to BE planning, but the Nereids FE path still needs the compatibility and serial-gating fixes. Testing: good coverage was added, but it does not cover mixed-version execution or the non-serial-source fragment cases noted inline. User focus: no additional user-provided focus points.
| description = {"是否在FE规划Local Shuffle", | ||
| "Whether to plan local shuffle in frontend"}, needForward = true) | ||
| private boolean enableLocalShufflePlanner = true; | ||
|
|
There was a problem hiding this comment.
This default enables the FE planner to serialize the new LOCAL_EXCHANGE_NODE/TLocalPartitionType protocol to every BE as soon as the FE is upgraded. During a rolling upgrade, an old BE does not have TPlanNodeType::LOCAL_EXCHANGE_NODE handling in _create_operator and will reject/fail such fragments, while RuntimeState::plan_local_shuffle() also disables the legacy BE planner because enable_local_shuffle_planner is set. Please gate this path on BE capability/version (or keep the default off until all BEs support the new node) so mixed FE/BE deployments continue to execute queries during upgrade.
There was a problem hiding this comment.
Our deployment process upgrades BE before FE, so during a rolling upgrade an old BE never receives LOCAL_EXCHANGE_NODE from a new FE — the new thrift node type is only sent after all BEs in the cluster have been upgraded. Keeping the default true is intentional so we get the FE planner enabled out of the box once the rollout completes. If you have a deployment topology that flips this order, please set enable_local_shuffle_planner=false in fe.conf or via SQL and the legacy BE path stays bit-for-bit identical to the old behavior.
| // 4. Layer 1: skip LE when serial operator or ancestor in same pipeline | ||
| // Equivalent to BE's need_to_local_exchange: any_of(operators[idx..end], is_serial) → skip | ||
| if (translatorContext.hasSerialAncestorInPipeline(this) || isSerialNode()) { | ||
| return childOutput; |
There was a problem hiding this comment.
This skip uses isSerialNode() even though the comment above isSerialOperatorOnBe() says an isSerialNode() only actually runs with one BE task when fragment.useSerialSource(context) is true. For fragments where useSerialSource is false (for example ignore_storage_data_distribution=false, query cache, or NAAJ), a node such as a scalar aggregate or unpartitioned exchange can still return isSerialNode()==true but BE will execute it with normal parallelism (is_serial_operator=false in thrift). In that case this branch skips a required LocalExchange even though BE would not consider the ancestor serial, so downstream hash/passthrough requirements can be silently dropped. The serial-ancestor propagation at line 1094 has the same issue. Please base these planner decisions on isSerialOperatorOnBe(translatorContext.getConnectContext()), not the syntactic isSerialNode(), except for the explicitly documented heavy-op/local-fragment cases.
There was a problem hiding this comment.
Good catch — fixed in 15d92ba. Both the Layer 1 skip and the serial-ancestor propagation in enforceRequire now use isSerialOperatorOnBe(translatorContext.getConnectContext()) instead of the raw isSerialNode().
Verified that BE's OperatorBase constructs from the Thrift is_serial_operator flag (which FE writes via isSerialOperatorOnBe, not isSerialNode) — so Pipeline::need_to_local_exchange's op->is_serial_operator() check returns false when fragment.useSerialSource(ctx) is false, even if isSerialNode() is true. The previous code would have over-skipped LocalExchange in exactly the scenarios you listed (ignore_storage_data_distribution=false, query cache, NAAJ).
Updated LocalShuffleNodeCoverageTest.testMaterializationNode and testSetOperationAndAssertNumRowsNode to reflect the corrected behavior: in the fragment-less unit-test path isSerialOperatorOnBe returns false (the fragment != null guard) so the framework no longer skips Layer 1 and inserts the required LocalExchange.
Other isSerialNode() call sites in PlanNode.java were audited and left as-is:
toThrift()already usesisSerialOperatorOnBehasSerialChildren()is a pure node-level tree walk used only for fragment-internal heuristicscreateLocalExchange()heavy-op gate is already inside afragment.useSerialSource(ctx)branch, soisSerialNodeandisSerialOperatorOnBeare equivalent there
Previously, local exchange (LE) nodes were inserted exclusively by the BE's `_plan_local_exchange` at pipeline build time. The FE had no visibility into which operators needed a fan-out or shuffle before execution, making it impossible to validate, optimize, or override LE decisions at planning time. This PR introduces a full FE-side local exchange planner that mirrors BE semantics, brings several correctness fixes, and leaves the legacy BE path fully intact behind a feature flag. See "Current architecture notes" at the bottom for what the FE planner does and does not own. A new `AddLocalExchange` pass runs after normal fragment assignment. It walks each fragment's plan tree bottom-up, calling the polymorphic `PlanNode.enforceAndDeriveLocalExchange()` on every node. Nodes declare what distribution they require of their children; the framework inserts `LocalExchangeNode` where needed. `LocalExchangeNode` represents intra-fragment data redistribution and supports the full set of exchange types: PASSTHROUGH, HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, GLOBAL_EXECUTION_HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, and NOOP. The pass is guarded by `enable_local_shuffle_planner` (default true). When disabled, BE continues to run its own `_plan_local_exchange` as before, keeping the old path fully intact. `maxPerBeInstances` (max pipeline instances assigned to any single BE) is used instead of a global `instanceCount`. Planning is a no-op when `maxPerBeInstances == 1` — inserting LE on a single-threaded pipeline would cause task-count mismatches and pipeline starvation. When a serial operator (e.g. OlapScanNode with a single tablet bucket) feeds a non-serial parent without an intermediate LE, downstream tasks starve waiting for data that never arrives. The framework detects this case and inserts a PASSTHROUGH LE to restore N-task parallelism, exactly matching BE's `required_data_distribution()` serial → PASSTHROUGH rule. `LocalExchangeTypeRequire` abstracts two strategies: - `RequireHash` — always resolves to `LOCAL_EXECUTION_HASH_SHUFFLE` (safe for intra-fragment hash partitioning). - `RequireSpecific` — preserves BUCKET_HASH_SHUFFLE / GLOBAL_EXECUTION_HASH_SHUFFLE without degradation. PR #62438 added `enable_local_exchange_before_agg`, but its BE guard `!_needs_finalize && !enable_local_exchange_before_agg → base` conflated two semantically different cases in AggSink and DistinctStreamingAgg: - **AggSink**: `!finalize && hasKeys` covered both LOCAL preagg (performance-only) and FIRST_MERGE dedup (correctness-critical). The flag-gated early-return wrongly skipped HASH for FIRST_MERGE, producing PASSTHROUGH-over-serial-child → wrong aggregation results. - **DistinctStreamingAgg**: `!finalize` covered both streaming preagg (`useStreamingPreagg=true`, performance) and non-streaming dedup (`useStreamingPreagg=false`, correctness). Same class of bug. FE fix: - AggSink: restrict the flag-gated base path to `!isMerge()` LOCAL phases. FIRST_MERGE always emits HASH regardless of the flag. - DistinctStreamingAgg: restrict to `useStreamingPreagg=true`. Non-streaming dedup always emits HASH. Also add `requiresShuffleForCorrectness()` to mirror BE's `is_shuffled_operator()`, so SetOperationNode propagates the "downstream depends on hash" flag correctly instead of using the coarser `parentRequire.preferType().isHashShuffle()` check that over-inserted HASH LE on every union branch under a streaming preagg. These fixes reduce FE/BE consistency mismatches from 8 to 3 (only pre-existing NLJ optimization differences remain). - `enable_local_shuffle_planner` — use FE planner (default true) - `enable_local_shuffle` — master switch for local shuffle - `enable_local_exchange_before_agg` — HASH LE before non-final agg (default true, mirrors #62438) `validateNoSerialWithoutLocalExchange()` walks the final plan tree and logs a warning whenever a serial operator feeds a non-serial parent without an intermediate LocalExchangeNode, catching planning gaps before execution. - `test_enable_local_exchange_before_agg.groovy` — 10 agg patterns with the flag on and off; covers the FIRST_MERGE and DistinctStreamingAgg correctness fixes. - `test_local_shuffle_fe_be_consistency.groovy` — runs the same SQL with `enable_local_shuffle_planner=true` and `=false` across the full operator matrix (Agg, Sort, Analytic, HashJoin, NLJ, Set, Union, TableFunction, AssertNumRows, RQG-derived corner cases) and asserts result rows are identical. Only data correctness is asserted — the two planners legitimately differ on the exact exchange counts/types they emit, so plan-shape equality is intentionally not checked. - `test_local_shuffle_rqg_bugs.groovy` — reproduces 20+ RQG-found crashes and wrong-result cases. - `test_old_coordinator_local_shuffle.groovy` — verifies the old coordinator path is unaffected. - `test_multilevel_join_agg_local_shuffle.groovy` — multi-level join and aggregation plan shapes. - `multi_version.h`: replace `atomic_load/atomic_store` (deprecated in libstdc++ C++20 / LLVM 20) with `std::shared_mutex`-based RW locking. - `memory.cpp`: fix `std::max` type mismatch (`long` vs `int64_t`) on macOS. - `bucketed_aggregation_sink_operator.h`: fix `ExchangeType::NOOP` → `TLocalPartitionType::NOOP` after thrift enum rename. This PR puts the FE planner in the driver's seat for LE insertion but intentionally does NOT remove the BE-side machinery — readers should be aware of three pieces the FE planner shares with or defers to BE: 1. **`is_serial_operator` is computed on both sides.** FE computes the flag and writes it into Thrift, but BE's `OperatorBase::is_serial_operator()` is still overridden per operator in C++ and used for BE-side runtime decisions. Any future change to the BE override needs to be mirrored on the FE side (and vice versa) to keep the planner's view consistent with execution. 2. **The legacy BE planner stays as a fallback.** `pipeline_fragment_context.cpp::_plan_local_exchange` is preserved and gated by `runtime_state.h::plan_local_shuffle()`: when `enable_local_shuffle_planner=false`, BE plans LE itself, exactly as before. The two paths are mutually exclusive, never both running on the same query. 3. **`_propagate_local_exchange_num_tasks` is kept as a runtime safety net.** The two propagation passes in `pipeline_fragment_context.cpp` fix up paired pipelines whose `num_tasks` end up mismatched (e.g. when AGG/SORT/JOIN pipeline splits leave a serial Exchange feeding an N-task sink). FE's framework-level serial→non-serial fan-out (`enforceRequire` step 3) and the `validateNoSerialWithoutLocalExchange` check aim to make these mismatches impossible by construction, but the BE-side fixup remains as a defensive guard. Co-authored-by: Gabriel <liwenqiang@selectdb.com>
6fa1901 to
15d92ba
Compare
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
Cloud UT Coverage ReportIncrement line coverage Increment coverage report
|
TPC-H: Total hot run time: 31734 ms |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary:
Move local exchange (LE) planning from BE's
_plan_local_exchange(pipeline build time) to a new FE-side planner. The FE planner mirrors BE semantics, brings several correctness fixes, and is gated by a session variable so the legacy BE path stays available as a fallback.Core design
AddLocalExchangepass runs afterDistributePlanner, walking each fragment's plan tree bottom-up via the polymorphicPlanNode.enforceAndDeriveLocalExchange(). Each node declares what distribution it requires of its children; the framework insertsLocalExchangeNodewhere needed.LocalExchangeNoderepresents intra-fragment data redistribution and supports PASSTHROUGH, GLOBAL/LOCAL/BUCKET HASH_SHUFFLE, BROADCAST, PASS_TO_ONE, ADAPTIVE_PASSTHROUGH, LOCAL_MERGE_SORT, NOOP.maxPerBeInstances(max pipeline instances assigned to any single BE) is used instead of global instance count to match BE's_num_instancescheck. Planning is a no-op whenmaxPerBeInstances == 1.required_data_distribution()rule.LocalExchangeTypeRequire:RequireHashadapts to any hash flavour,RequireSpecificpreserves the exact requested type.AggregationNode correctness fixes (DORIS-25413)
PR #62438 introduced a semantic split for
required_data_distribution=HASH(correctness-required vs performance-only). BE's!_needs_finalize && !enable_local_exchange_before_agg → baseearly-return conflates both intents inAggSinkOperatorXandDistinctStreamingAggOperatorX, wrongly catching FIRST_MERGE (correctness) / non-streaming dedup (correctness) and producing PASSTHROUGH-over-serial-child → wrong aggregation results. The FE planner adds the missing!isMerge()/useStreamingPreagg=trueguards so FIRST_MERGE and non-streaming dedup always emit HASH, regardless of the flag. Also addsrequiresShuffleForCorrectness()(mirrors BE'sis_shuffled_operator()) so SetOperationNode propagates the "downstream depends on hash" flag correctly through chains.Session variables
enable_local_shuffle_planner(default true) — use FE planner; when false, BE plans LE itself via the legacy path.enable_local_shuffle— master switch.enable_local_exchange_before_agg— mirrors [Improvement](agg) Add a knob to control local exchange #62438.Architectural notes
This PR puts the FE planner in the driver's seat for LE insertion but intentionally keeps BE-side machinery as a fallback:
is_serial_operatoris still computed on both sides — any future change to BE's per-operator C++ override must be mirrored in FE.pipeline_fragment_context.cpp::_plan_local_exchange) is preserved and gated byruntime_state.h::plan_local_shuffle(); the two paths are mutually exclusive._propagate_local_exchange_num_tasksis kept as a runtime safety net for paired-pipeline num_tasks mismatches.Build fixes (cross-toolchain portability)
multi_version.h: replaceatomic_load/atomic_store(deprecated in libstdc++ C++20 / LLVM 20) withstd::shared_mutex-based RW locking.memory.cpp: fixstd::maxtype mismatch (longvsint64_t) on macOS.bucketed_aggregation_sink_operator.h: fixExchangeType::NOOP→TLocalPartitionType::NOOPafter thrift enum rename.Release note
Add session variable
enable_local_shuffle_planner(default true) to control whether local exchange nodes are planned in FE (new path) or in BE (legacy_plan_local_exchange). The two paths are mutually exclusive; the legacy path remains intact behind this flag.Check List (For Author)
Test
Behavior changed:
enable_local_shuffle_planner=true(default). Plan shapes (LOCAL_EXCHANGE_NODE in TPlanNode) and exchange counts may differ from the legacy BE-planned path, but query results remain equivalent. Settingenable_local_shuffle_planner=falserestores the legacy behavior bit-for-bit.Does this need documentation?
enable_local_shuffle_plannershould be added to the documentation; doc PR will be filed separately.Check List (For Reviewer who merge this PR)