Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529
Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529neildsh wants to merge 18 commits intocitusdata:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #8529 +/- ##
===========================================
- Coverage 88.91% 41.11% -47.80%
===========================================
Files 286 287 +1
Lines 63198 63146 -52
Branches 7933 7878 -55
===========================================
- Hits 56190 25961 -30229
- Misses 4734 34567 +29833
- Partials 2274 2618 +344 🚀 New features to boost your workflow:
|
| Var *partitionKey, PartitionType partitionType, | ||
| Oid baseRelationId, | ||
| BoundaryNodeJobType boundaryNodeJobType); | ||
| static SortedMergeKey * BuildSortedMergeKeys(List *sortClauseList, |
There was a problem hiding this comment.
Investigate if we can use the job queries to compute this at the site where it is used instead of hanging it off the DistributionPlan
Phase 1 of the sorted-merge feature. This commit adds the data structures and GUC needed by later phases, with zero behavioral changes: - SortedMergeKey typedef in multi_physical_planner.h describing one sort key for the coordinator k-way merge - useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on DistributedPlan (plan-time decision, never checked at runtime via GUC) - sortedMergeEligible field on MultiExtendedOp (logical optimizer tag read by the physical planner) - Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off, GUC_NO_SHOW_ALL) consulted only during planning - Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c for all new fields All new fields default to false/0/NULL. Existing regression tests are unaffected. Co-authored-by: Copilot
Phase 2 of the sorted-merge feature. Workers now sort their results when citus.enable_sorted_merge is enabled at planning time, even for queries without LIMIT. The plan metadata is populated so later phases can execute the merge and set pathkeys. Logical optimizer changes (multi_logical_optimizer.c): - WorkerSortClauseList() gains an early-return path that pushes the sort clause to workers when the GUC is on and the sort is safe (no aggregates in ORDER BY, no non-pushable window functions, and either no GROUP BY or GROUP BY on partition column). - WorkerExtendedOpNode() sets sortedMergeEligible = true when the worker sort clause semantically matches the original sort clause, using the new SortClauseListsMatch() helper. - SortClauseListsMatch() compares tleSortGroupRef, sortop, nulls_first, and eqop for each pair. Physical planner changes (multi_physical_planner.c): - CreatePhysicalDistributedPlan() finds the worker MultiExtendedOp with sortedMergeEligible = true, builds SortedMergeKey metadata from the worker job query, and sets useSortedMerge on the plan. - BuildSortedMergeKeys() constructs the key array from the worker query's SortGroupClause list and target list. The coordinator Sort node is still present above the CustomScan (pathkeys not set yet — that is Phase 4). Results are correct because the redundant Sort re-sorts already-sorted data. Co-authored-by: Copilot
Phase 3 of the sorted-merge feature. When distributedPlan->useSortedMerge is true (set at planning time by Phase 2), the adaptive executor now: 1. Routes worker results into per-task tuple stores via a new PerTaskDispatchTupleDest that dispatches by task->taskId hash lookup. No Task fields are mutated — all state lives on DistributedExecution. 2. After all tasks complete, performs a k-way merge of the per-task stores into the final scanState->tuplestorestate using PostgreSQL's public binaryheap and SortSupport APIs. 3. Frees per-task stores after the merge. The existing CitusExecScan/ReturnTupleFromTuplestore/CitusEndScan/ CitusReScan code paths are completely unchanged — they read from the final tuplestore exactly as before. New files: - sorted_merge.h: CreatePerTaskDispatchDest, MergePerTaskStoresIntoFinalStore - sorted_merge.c: PerTaskDispatchTupleDest with taskId->index hash routing, MergePerTaskStoresIntoFinalStore with binaryheap merge, MergeHeapComparator modeled after PG's heap_compare_slots in nodeMergeAppend.c Modified: - adaptive_executor.c: DistributedExecution gains useSortedMerge/perTaskStores/ perTaskStoreCount fields. AdaptiveExecutor() branches on useSortedMerge to create per-task stores, then merges post-execution. EXPLAIN ANALYZE falls back to existing single-tuplestore path. Safety: - Shared TupleDestinationStats preserves citus.max_intermediate_result_size - Per-task stores allocated in AdaptiveExecutor local memory context (auto-cleanup on error via PG memory context teardown) - task->totalReceivedTupleData tracking preserved The coordinator Sort node is still present above the CustomScan (pathkeys not set until Phase 4). Results are correct because the redundant Sort re-sorts already-sorted data. Co-authored-by: Copilot
Phase 1 of the sorted-merge feature. This commit adds the data structures and GUC needed by later phases, with zero behavioral changes: - SortedMergeKey typedef in multi_physical_planner.h describing one sort key for the coordinator k-way merge - useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on DistributedPlan (plan-time decision, never checked at runtime via GUC) - sortedMergeEligible field on MultiExtendedOp (logical optimizer tag read by the physical planner) - Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off, GUC_NO_SHOW_ALL) consulted only during planning - Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c for all new fields All new fields default to false/0/NULL. Existing regression tests are unaffected. Co-authored-by: Copilot
…cases to impose a deterministic order
…n indication of sorted merge in the EXPLAIN output
| */ | ||
| bool useSortedMerge; | ||
| Tuplestorestate **perTaskStores; | ||
| int perTaskStoreCount; |
There was a problem hiding this comment.
Suggestion: keep sorted merge state in a data structure that hangs off CitusScanState. That data structure can be private to sorted_merge, and sorted_merge has something like this strawman API:
void InitSortedMerge(CitusScanState *, tasklist, sort clause, ...);
bool SortedMergeActive(CitusScanState*);
TupleTableSlot *ReturnTupleFromSortedMerge(CitusScanState*); // see 2nd comment in `sorted_merge.c`
void CleanupSortedMergeState(struct CitusScanState *scanState);
This fits into the canonical Postgres flow - for example per-task tuplestores are closed in the Cleanup function, which would be called by CitusEndScan() and CitusReScan(), as opposed to an if statement in adaptive executor, which looks a bit ad-hoc tbh.
| *perTaskStoresOut = perTaskStores; | ||
| *perTaskStoreCountOut = taskCount; | ||
|
|
||
| return (TupleDestination *) dispatch; |
There was a problem hiding this comment.
I like the idea of implementing a TupleDestination but can't help thinking a simpler and effective approach can be achieved by using the current implementation; initialize each task->tupleDest with CreateTupleStoreTupleDest(), and execution (specifically ReceiveResults()) will just route to that.
The defaultTupleDest is bypassed, but there's no hashtable indirection, no code duplication (PerTaskDispatchPutTuple() largely clones the current implementation) , and less state to carry around.
| * Each per-task store must contain tuples sorted by the given merge keys. | ||
| * The output tuplestore will contain all tuples in globally sorted order. | ||
| * | ||
| * Uses PostgreSQL's public binaryheap and SortSupport APIs. |
There was a problem hiding this comment.
Given that this function is called in AdaptiveExecutor(), is it the case that the merge is run to completion before the first row is returned ? An alternative would be to do the merge on demand (or lazily) after AdaptiveExecutor() has completed in CitusExecScan(), with something like:
if (SortedMergeActive(scanState)
{
return ReturnTupleFromSortedMerge(scanState); // does the merge incrementally
}
return ReturnTupleFromTuplestore(scanState);
This enables earlier return for LIMIT queries - LIMIT N requires merging N tuples, or a FETCH 1 merges 1 tuple, as opposed to merging all tuples before returning the first. This requires maintaining state (heap, slots, sort keys) for the sorted merge across calls but the memory required is small compared to the tuples. And the per-task tuplestores do not have random access on so they're trimmed by Postgres as the merge proceeds.
| if (orderByLimitReference.groupClauseIsEmpty || | ||
| orderByLimitReference.groupedByDisjointPartitionColumn) | ||
| { | ||
| return copyObject(sortClauseList); |
There was a problem hiding this comment.
Suggestion: additionally indirectly return a flag to indicate that push of sort clause to the workers is in effect:
*sortPushdown = true; // initialized to false
return copyObject(sortClauseList);
This can be saved to the QueryOrderByLimit and then propagated to workerExtendedOpNode->sortedMergeEligible. It would require an additional flag in QueryOrderByLimit (and maybe rename that data structure also) but avoids the sort-merge eligibility calculation in WorkerExtendedOpNode().
Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries
Summary
When a multi-shard
SELECT ... ORDER BYquery goes through the logical planner, Citus currently collects all worker results into a single tuplestore and relies on a PostgreSQLSortnode above theCustom Scanto produce the final ordering. This PR adds an alternative path: pushORDER BYto workers, k-way merge the pre-sorted worker results on the coordinator using a binary heap, and declarepathkeyson theCustomPathso PostgreSQL eliminates the coordinatorSortnode entirely.The feature is gated behind a hidden GUC
citus.enable_sorted_merge(defaultoff,PGC_SUSET,GUC_NO_SHOW_ALL). All eligibility decisions are made at planning time and baked into the serializedDistributedPlan, so cached/prepared plans remain correct regardless of later GUC changes.The sorted merge implementation is based on the Postgres MergeAppend node.
How it works
Worker sort pushdown (
multi_logical_optimizer.c):WorkerSortClauseList()gains an early-return path that pushes ORDER BY to workers even without LIMIT, when the GUC is on and the sort is safe (no aggregates in ORDER BY, no non-pushable window functions, GROUP BY on distribution column or absent).Planner eligibility (
multi_logical_optimizer.c,multi_physical_planner.c):WorkerExtendedOpNode()tags the workerMultiExtendedOpwithsortedMergeEligible = truewhen the worker sort clause semantically matches the original.SetSortedMergeFields()in the physical planner buildsSortedMergeKeymetadata (attno, sortop, collation, nullsFirst) and setsuseSortedMergeon theDistributedPlan.PathKeys (
combine_query_planner.c):CreateCitusCustomScanPath()setspath->pathkeys = root->sort_pathkeyswhen the plan hasuseSortedMerge = true, causing PostgreSQL'screate_ordered_paths()to skip adding a Sort node.Per-task stores + k-way merge (
sorted_merge.c,adaptive_executor.c): A newPerTaskDispatchTupleDestroutes worker tuples to per-task tuplestores bytaskIdhash lookup (no Task fields mutated). After all tasks complete,MergePerTaskStoresIntoFinalStore()performs a k-way merge using PostgreSQL's publicbinaryheapandSortSupportAPIs, writing sorted output into the existingscanState->tuplestorestate. The existingCitusExecScan()/ReturnTupleFromTuplestore()path is completely unchanged.Follow up changes
The fact that we continue to use the default tuple store for the final result set in addition to the per task stores means that the memory consumption increases when this change is enabled. Follow up work is to stop using the default tuple store to reduce the memory consumption. This is done in #8545
Safety properties
distributedPlan->useSortedMerge. Cached plans are safe.DistributedExecution(execution-local), not on reusableTasknodes. Onlytask->totalReceivedTupleDatais updated (execution-time reporting field, reset each execution).CitusExecScan,CitusEndScan,CitusReScan,ReturnTupleFromTuplestoreare all unchanged. Quals, projection, backward scan, and cursor support work exactly as before.TupleDestinationStatsobject, preservingcitus.max_intermediate_result_sizeenforcement semantics.EnsureIntermediateSizeLimitNotExceeded()is now exported fromtuple_destination.cfor use by the dispatch destination.HasOrderByAggregate().Files changed
sorted_merge.h/sorted_merge.cCreatePerTaskDispatchDest,MergePerTaskStoresIntoFinalStore,MergeHeapComparatormulti_logical_optimizer.cSortClauseListsMatch()multi_physical_planner.cSetSortedMergeFields()+BuildSortedMergeKeys()combine_query_planner.cpathkeysonCustomPathadaptive_executor.cmulti_physical_planner.hSortedMergeKeystruct + fields onDistributedPlanandMultiExtendedOptuple_destination.h/.cEnsureIntermediateSizeLimitNotExceeded()shared_library_init.c/multi_executor.c/.hcitus_outfuncs.c/citus_copyfuncs.cmulti_orderby_pushdown.sql/.outTest coverage
The new
multi_orderby_pushdownregression test covers:max_intermediate_result_sizewith CTE subplanValidated with
citus.enable_sorted_mergeglobally enabled: 0 crashes across check-multi (192 tests) and check-multi-1 (210 tests). All failures are expected plan-shape diffs (Sort node elision in EXPLAIN output).