Skip to content

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529

Open
neildsh wants to merge 18 commits intocitusdata:mainfrom
neildsh:sortedMerge
Open

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529
neildsh wants to merge 18 commits intocitusdata:mainfrom
neildsh:sortedMerge

Conversation

@neildsh
Copy link
Copy Markdown

@neildsh neildsh commented Mar 23, 2026

Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries

Summary

When a multi-shard SELECT ... ORDER BY query goes through the logical planner, Citus currently collects all worker results into a single tuplestore and relies on a PostgreSQL Sort node above the Custom Scan to produce the final ordering. This PR adds an alternative path: push ORDER BY to workers, k-way merge the pre-sorted worker results on the coordinator using a binary heap, and declare pathkeys on the CustomPath so PostgreSQL eliminates the coordinator Sort node entirely.

The feature is gated behind a hidden GUC citus.enable_sorted_merge (default off, PGC_SUSET, GUC_NO_SHOW_ALL). All eligibility decisions are made at planning time and baked into the serialized DistributedPlan, so cached/prepared plans remain correct regardless of later GUC changes.

The sorted merge implementation is based on the Postgres MergeAppend node.

How it works

Workers (each sorted locally)
  └─> per-task tuple stores (routed by taskId, no Task mutation)
        └─> coordinator k-way merge (binaryheap + SortSupport)
              └─> final scanState->tuplestorestate
                    └─> existing ReturnTupleFromTuplestore()
                          └─> quals / projection / backward scan / rescan (unchanged)
  1. Worker sort pushdown (multi_logical_optimizer.c): WorkerSortClauseList() gains an early-return path that pushes ORDER BY to workers even without LIMIT, when the GUC is on and the sort is safe (no aggregates in ORDER BY, no non-pushable window functions, GROUP BY on distribution column or absent).

  2. Planner eligibility (multi_logical_optimizer.c, multi_physical_planner.c): WorkerExtendedOpNode() tags the worker MultiExtendedOp with sortedMergeEligible = true when the worker sort clause semantically matches the original. SetSortedMergeFields() in the physical planner builds SortedMergeKey metadata (attno, sortop, collation, nullsFirst) and sets useSortedMerge on the DistributedPlan.

  3. PathKeys (combine_query_planner.c): CreateCitusCustomScanPath() sets path->pathkeys = root->sort_pathkeys when the plan has useSortedMerge = true, causing PostgreSQL's create_ordered_paths() to skip adding a Sort node.

  4. Per-task stores + k-way merge (sorted_merge.c, adaptive_executor.c): A new PerTaskDispatchTupleDest routes worker tuples to per-task tuplestores by taskId hash lookup (no Task fields mutated). After all tasks complete, MergePerTaskStoresIntoFinalStore() performs a k-way merge using PostgreSQL's public binaryheap and SortSupport APIs, writing sorted output into the existing scanState->tuplestorestate. The existing CitusExecScan()/ReturnTupleFromTuplestore() path is completely unchanged.

Follow up changes

The fact that we continue to use the default tuple store for the final result set in addition to the per task stores means that the memory consumption increases when this change is enabled. Follow up work is to stop using the default tuple store to reduce the memory consumption. This is done in #8545

Safety properties

  • Plan-time only: The GUC is consulted only during planning. The executor reads only distributedPlan->useSortedMerge. Cached plans are safe.
  • No Task mutation: Per-task dispatch state lives on DistributedExecution (execution-local), not on reusable Task nodes. Only task->totalReceivedTupleData is updated (execution-time reporting field, reset each execution).
  • Scan contract preserved: The merge writes into the existing final tuplestore. CitusExecScan, CitusEndScan, CitusReScan, ReturnTupleFromTuplestore are all unchanged. Quals, projection, backward scan, and cursor support work exactly as before.
  • Shared intermediate-result accounting: All per-task destinations share a single TupleDestinationStats object, preserving citus.max_intermediate_result_size enforcement semantics. EnsureIntermediateSizeLimitNotExceeded() is now exported from tuple_destination.c for use by the dispatch destination.
  • Aggregate ORDER BY exclusion: Queries with ORDER BY on aggregates are excluded from sorted merge eligibility via HasOrderByAggregate().

Files changed

File Change
sorted_merge.h / sorted_merge.c NEWCreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore, MergeHeapComparator
multi_logical_optimizer.c Worker sort pushdown + eligibility check + SortClauseListsMatch()
multi_physical_planner.c SetSortedMergeFields() + BuildSortedMergeKeys()
combine_query_planner.c Set pathkeys on CustomPath
adaptive_executor.c Per-task store routing + post-merge into final tuplestore
multi_physical_planner.h SortedMergeKey struct + fields on DistributedPlan and MultiExtendedOp
tuple_destination.h / .c Export EnsureIntermediateSizeLimitNotExceeded()
shared_library_init.c / multi_executor.c / .h GUC registration
citus_outfuncs.c / citus_copyfuncs.c Serialization of new plan fields
multi_orderby_pushdown.sql / .out NEW — 60+ regression tests

Test coverage

The new multi_orderby_pushdown regression test covers:

  • Eligibility: 10 EXPLAIN tests verifying worker Sort is pushed for eligible queries (simple ORDER BY, DESC, NULLS ordering, multi-column, mixed directions, GROUP BY dist_col, WHERE+ORDER BY, expressions, LIMIT)
  • Ineligibility: 4 EXPLAIN tests verifying Sort is NOT pushed for ineligible queries (ORDER BY aggregate, GROUP BY non-dist col)
  • Correctness: 8 GUC off/on result-comparison pairs (ASC, DESC, multi-column, non-dist col, GROUP BY dist_col, mixed directions, WHERE, aggregates in SELECT)
  • Complex queries: Subquery, CTE, co-located JOIN, UNION ALL, DISTINCT, DISTINCT ON, EXISTS, IN subquery, multiple aggregates, CASE, NULL ordering, OFFSET, ordinal ORDER BY
  • Sort elision: EXPLAIN verification that coordinator Sort node is absent with GUC on
  • Plan cache: PREPARE/EXECUTE with GUC toggling (plan-time decision baked in)
  • Cursor: FETCH FORWARD + FETCH BACKWARD over sorted merge results
  • EXPLAIN ANALYZE: Falls back to non-merge path
  • Memory pressure: Small work_mem (64kB) with 32 shards
  • Intermediate result limits: max_intermediate_result_size with CTE subplan
  • Subplan interactions: 7 tests for CTE/subquery patterns with sorted merge (multiple CTEs, cross-joins, nested subplans, correctness comparison)
  • Subplan EXPLAIN: Query plans for all subplan patterns

Validated with citus.enable_sorted_merge globally enabled: 0 crashes across check-multi (192 tests) and check-multi-1 (210 tests). All failures are expected plan-shape diffs (Sort node elision in EXPLAIN output).

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 24, 2026

Codecov Report

❌ Patch coverage is 17.46032% with 156 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.11%. Comparing base (029f381) to head (b4f9fe5).
⚠️ Report is 1 commits behind head on main.

❗ There is a different number of reports uploaded between BASE (029f381) and HEAD (b4f9fe5). Click for more details.

HEAD has 81 uploads less than BASE
Flag BASE (029f381) HEAD (b4f9fe5)
18_regress_check-tap 1 0
16_regress_check-follower-cluster 1 0
16_regress_check-tap 1 0
18_regress_check-columnar-isolation 1 0
16_regress_check-columnar-isolation 1 0
17_regress_check-follower-cluster 1 0
18_regress_check-add-backup-node 1 0
16_regress_check-add-backup-node 1 0
18_regress_check-follower-cluster 1 0
17_regress_check-tap 1 0
17_regress_check-add-backup-node 1 0
17_18_upgrade 1 0
16_regress_check-query-generator 1 0
16_regress_check-enterprise-isolation-logicalrep-3 1 0
17_regress_check-enterprise-isolation-logicalrep-3 1 0
18_regress_check-enterprise-isolation-logicalrep-2 1 0
18_regress_check-enterprise-isolation-logicalrep-3 1 0
16_regress_check-split 1 0
18_regress_check-columnar 1 0
16_regress_check-enterprise-isolation-logicalrep-2 1 0
16_regress_check-enterprise-failure 1 0
17_regress_check-query-generator 1 0
17_regress_check-columnar 1 0
17_regress_check-columnar-isolation 1 0
17_regress_check-vanilla 1 0
16_regress_check-vanilla 1 0
17_regress_check-enterprise 1 0
16_regress_check-enterprise-isolation 1 0
16_regress_check-enterprise 1 0
18_regress_check-enterprise 1 0
17_regress_check-enterprise-failure 1 0
16_arbitrary_configs_3 1 0
18_regress_check-multi-mx 1 0
16_regress_check-multi-mx 1 0
17_regress_check-enterprise-isolation-logicalrep-2 1 0
18_regress_check-enterprise-failure 1 0
17_regress_check-split 1 0
18_regress_check-vanilla 1 0
18_regress_check-split 1 0
17_regress_check-multi-mx 1 0
17_regress_check-failure 1 0
18_regress_check-enterprise-isolation 1 0
17_regress_check-enterprise-isolation-logicalrep-1 1 0
16_regress_check-enterprise-isolation-logicalrep-1 1 0
18_cdc_installcheck 1 0
18_regress_check-multi-1-create-citus 1 0
17_regress_check-multi-1-create-citus 1 0
17_regress_check-enterprise-isolation 1 0
18_regress_check-enterprise-isolation-logicalrep-1 1 0
16_regress_check-failure 1 0
17_cdc_installcheck 1 0
18_regress_check-operations 1 0
17_regress_check-operations 1 0
16_regress_check-isolation 1 0
17_regress_check-isolation 1 0
17_arbitrary_configs_4 1 0
18_regress_check-isolation 1 0
16_arbitrary_configs_2 1 0
18_arbitrary_configs_3 1 0
16_regress_check-columnar 1 0
17_arbitrary_configs_3 1 0
16_regress_check-multi 1 0
16_cdc_installcheck 1 0
16_arbitrary_configs_0 1 0
18_regress_check-multi 1 0
18_arbitrary_configs_5 1 0
17_regress_check-multi 1 0
17_arbitrary_configs_5 1 0
16_arbitrary_configs_5 1 0
18_regress_check-multi-1 1 0
17_arbitrary_configs_2 1 0
18_arbitrary_configs_2 1 0
16_regress_check-operations 1 0
17_regress_check-multi-1 1 0
18_regress_check-failure 1 0
16_regress_check-multi-1-create-citus 1 0
18_arbitrary_configs_0 1 0
17_arbitrary_configs_0 1 0
18_arbitrary_configs_4 1 0
16_regress_check-multi-1 1 0
17_arbitrary_configs_1 1 0
Additional details and impacted files
@@             Coverage Diff             @@
##             main    #8529       +/-   ##
===========================================
- Coverage   88.91%   41.11%   -47.80%     
===========================================
  Files         286      287        +1     
  Lines       63198    63146       -52     
  Branches     7933     7878       -55     
===========================================
- Hits        56190    25961    -30229     
- Misses       4734    34567    +29833     
- Partials     2274     2618      +344     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread src/test/regress/multi_schedule
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql
Comment thread src/backend/distributed/planner/multi_logical_optimizer.c
Comment thread src/test/regress/expected/multi_orderby_pushdown.out Outdated
Comment thread src/test/regress/expected/multi_orderby_pushdown.out
Comment thread src/test/regress/sql/multi_orderby_pushdown.sql
Var *partitionKey, PartitionType partitionType,
Oid baseRelationId,
BoundaryNodeJobType boundaryNodeJobType);
static SortedMergeKey * BuildSortedMergeKeys(List *sortClauseList,
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigate if we can use the job queries to compute this at the site where it is used instead of hanging it off the DistributionPlan

neildsh added 15 commits April 14, 2026 20:03
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
Phase 2 of the sorted-merge feature. Workers now sort their results
when citus.enable_sorted_merge is enabled at planning time, even for
queries without LIMIT. The plan metadata is populated so later phases
can execute the merge and set pathkeys.

Logical optimizer changes (multi_logical_optimizer.c):
- WorkerSortClauseList() gains an early-return path that pushes the
  sort clause to workers when the GUC is on and the sort is safe
  (no aggregates in ORDER BY, no non-pushable window functions,
  and either no GROUP BY or GROUP BY on partition column).
- WorkerExtendedOpNode() sets sortedMergeEligible = true when the
  worker sort clause semantically matches the original sort clause,
  using the new SortClauseListsMatch() helper.
- SortClauseListsMatch() compares tleSortGroupRef, sortop,
  nulls_first, and eqop for each pair.

Physical planner changes (multi_physical_planner.c):
- CreatePhysicalDistributedPlan() finds the worker MultiExtendedOp
  with sortedMergeEligible = true, builds SortedMergeKey metadata
  from the worker job query, and sets useSortedMerge on the plan.
- BuildSortedMergeKeys() constructs the key array from the worker
  query's SortGroupClause list and target list.

The coordinator Sort node is still present above the CustomScan
(pathkeys not set yet — that is Phase 4). Results are correct
because the redundant Sort re-sorts already-sorted data.

Co-authored-by: Copilot
Phase 3 of the sorted-merge feature. When distributedPlan->useSortedMerge
is true (set at planning time by Phase 2), the adaptive executor now:
1. Routes worker results into per-task tuple stores via a new
   PerTaskDispatchTupleDest that dispatches by task->taskId hash lookup.
   No Task fields are mutated — all state lives on DistributedExecution.
2. After all tasks complete, performs a k-way merge of the per-task stores
   into the final scanState->tuplestorestate using PostgreSQL's public
   binaryheap and SortSupport APIs.
3. Frees per-task stores after the merge.

The existing CitusExecScan/ReturnTupleFromTuplestore/CitusEndScan/
CitusReScan code paths are completely unchanged — they read from
the final tuplestore exactly as before.

New files:
- sorted_merge.h: CreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore
- sorted_merge.c: PerTaskDispatchTupleDest with taskId->index hash routing,
  MergePerTaskStoresIntoFinalStore with binaryheap merge, MergeHeapComparator
  modeled after PG's heap_compare_slots in nodeMergeAppend.c
Modified:
- adaptive_executor.c: DistributedExecution gains useSortedMerge/perTaskStores/
  perTaskStoreCount fields. AdaptiveExecutor() branches on useSortedMerge to
  create per-task stores, then merges post-execution. EXPLAIN ANALYZE falls
  back to existing single-tuplestore path.
Safety:
- Shared TupleDestinationStats preserves citus.max_intermediate_result_size
- Per-task stores allocated in AdaptiveExecutor local memory context
  (auto-cleanup on error via PG memory context teardown)
- task->totalReceivedTupleData tracking preserved

The coordinator Sort node is still present above the CustomScan (pathkeys
not set until Phase 4). Results are correct because the redundant Sort
re-sorts already-sorted data.
  Co-authored-by: Copilot
Phase 1 of the sorted-merge feature. This commit adds the data  structures and GUC needed by later phases, with zero behavioral changes:
- SortedMergeKey typedef in multi_physical_planner.h describing one
  sort key for the coordinator k-way merge
- useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on
  DistributedPlan (plan-time decision, never checked at runtime via GUC)
- sortedMergeEligible field on MultiExtendedOp (logical optimizer tag
  read by the physical planner)
- Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off,
  GUC_NO_SHOW_ALL) consulted only during planning
- Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c
  for all new fields

All new fields default to false/0/NULL. Existing regression tests are
unaffected.

Co-authored-by: Copilot
…n indication of sorted merge in the EXPLAIN output
*/
bool useSortedMerge;
Tuplestorestate **perTaskStores;
int perTaskStoreCount;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: keep sorted merge state in a data structure that hangs off CitusScanState. That data structure can be private to sorted_merge, and sorted_merge has something like this strawman API:

void InitSortedMerge(CitusScanState *, tasklist, sort clause, ...);
bool SortedMergeActive(CitusScanState*);
TupleTableSlot *ReturnTupleFromSortedMerge(CitusScanState*); // see 2nd comment in `sorted_merge.c`
void CleanupSortedMergeState(struct CitusScanState *scanState);

This fits into the canonical Postgres flow - for example per-task tuplestores are closed in the Cleanup function, which would be called by CitusEndScan() and CitusReScan(), as opposed to an if statement in adaptive executor, which looks a bit ad-hoc tbh.

*perTaskStoresOut = perTaskStores;
*perTaskStoreCountOut = taskCount;

return (TupleDestination *) dispatch;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of implementing a TupleDestination but can't help thinking a simpler and effective approach can be achieved by using the current implementation; initialize each task->tupleDest with CreateTupleStoreTupleDest(), and execution (specifically ReceiveResults()) will just route to that.
The defaultTupleDest is bypassed, but there's no hashtable indirection, no code duplication (PerTaskDispatchPutTuple() largely clones the current implementation) , and less state to carry around.

* Each per-task store must contain tuples sorted by the given merge keys.
* The output tuplestore will contain all tuples in globally sorted order.
*
* Uses PostgreSQL's public binaryheap and SortSupport APIs.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this function is called in AdaptiveExecutor(), is it the case that the merge is run to completion before the first row is returned ? An alternative would be to do the merge on demand (or lazily) after AdaptiveExecutor() has completed in CitusExecScan(), with something like:

    if (SortedMergeActive(scanState)
    {
         return ReturnTupleFromSortedMerge(scanState); // does the merge incrementally
    }

    return ReturnTupleFromTuplestore(scanState);

This enables earlier return for LIMIT queries - LIMIT N requires merging N tuples, or a FETCH 1 merges 1 tuple, as opposed to merging all tuples before returning the first. This requires maintaining state (heap, slots, sort keys) for the sorted merge across calls but the memory required is small compared to the tuples. And the per-task tuplestores do not have random access on so they're trimmed by Postgres as the merge proceeds.

if (orderByLimitReference.groupClauseIsEmpty ||
orderByLimitReference.groupedByDisjointPartitionColumn)
{
return copyObject(sortClauseList);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: additionally indirectly return a flag to indicate that push of sort clause to the workers is in effect:

      *sortPushdown = true;    // initialized to false
      return copyObject(sortClauseList);

This can be saved to the QueryOrderByLimit and then propagated to workerExtendedOpNode->sortedMergeEligible. It would require an additional flag in QueryOrderByLimit (and maybe rename that data structure also) but avoids the sort-merge eligibility calculation in WorkerExtendedOpNode().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants