Limit pushdown with filters + sort-aware reverse output#7
Draft
ch-sc wants to merge 2 commits into
Draft
Conversation
Signed-off-by: Christoph Schulze <christoph.schulze@polygon.io>
Signed-off-by: Christoph Schulze <christoph.schulze@polygon.io>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two related perf gaps surfaced while investigating reverse order scans:
LIMITis silently dropped when a filter is set.ScanBuilderbailed outright onfilter + limit, andVortexOpenerworked around it by gatingwith_limitonfilter.is_none(). The result was thatSELECT … WHERE x > k LIMIT nalways scanned every file end-to-end.ORDER BY ts DESCagainst a table declaredWITH ORDER (ts ASC)doesn't exploit reverse scanning.VortexSource::with_reversedalready existed andScanBuilderalready supported reverse intra-file scans, but noFileSource::try_reverse_outputhook was wired up. So DataFusion always inserted a fullSortExecinstead of letting the source absorb the reversal.Together they meant that the canonical "latest N matching rows" query (e.g.
SELECT * FROM t WHERE x > k ORDER BY ts DESC LIMIT 100over time-partitioned files) read every file and every row. After the changes in this PR it typically reads only the last file, short-circuiting once the limit is satisfied.SELECT * FROM t WHERE x > k ORDER BY ts DESC LIMIT 100over time-partitionedfiles now executes as:
ORDER BY ts DESC, table's output ordering ists ASC. CallsVortexSource::try_pushdown_sort.Exactwithreversed=true. DataFusion reversesfile_groups(so the latest file comes first) and removes theSortExec.x > kviatry_pushdown_filters(already in place).LIMIT 100viawith_fetchonDataSourceExec(already in place; passes through tobase_config.limit).filter.is_none()guard).remaining = 40.remaining = 40, reserves 40, exits early after enough rows. Done.reversedis now surfaced infmt_extraso plan rendering showsreversed: true, which makes the pushdown observable in tests.Edge cases.
Unsupported.is_reverseis strict; we returnUnsupported, the planner falls back to a fullSortExec. Correct, just not optimal.reversed = !reversed).Risks & limitations
compare_exchange_weakallows spurious failures; bounded by physical concurrency. 32-thread stress test (reserve_up_to_concurrent_never_oversubscribes) exercises this.LimitExectruncates. We pay extra IO, never wrong rows.ORDER BY ts DESC, id ASC(multi-key with mixed direction)is_reverseis strict; we returnUnsupported, planner falls back toSortExec. Correct, just not optimal.SortExeceven after reverse pushdownInexact(Parquet does the same on 52). The TopK still benefits fromfetchpushdown and from the source emitting locally-sorted streams.API Changes
VortexSource::with_reversed(bool) -> SelfVortexSource::try_reverse_output(&self, &[PhysicalSortExpr], &EquivalenceProperties) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>>ScanBuilder::with_reversed(bool) -> SelfandScanBuilder::reversed(&self) -> bool(already existed in the type, now stamped into the public-api lock).public-api.lockfiles regenerated viacargo xtask public-api.Testing
vortex-scan/src/tasks.rs(CAS unit tests):reserve_up_to_drains_counter— sequential reservations drain to zero.reserve_up_to_concurrent_never_oversubscribes— 32-thread stress proves theCAS loop never lets the counter underflow or hand out more than the initial
budget.
vortex-file/src/tests.rs(filter + limit at the file level):limit_with_filter_truncates_globally— multi-chunk scan with a filter andlimit=7returns exactly 7 rows. (Previously this combination would havebailed in
ScanBuilder::prepare.)limit_zero_with_filter_yields_no_rows—limit=0short-circuits even whena filter is present.
vortex-datafusion/src/persistent/mod.rs(sort pushdown):reverse_pushdown_swaps_file_order—WITH ORDER (ts ASC)+ORDER BY ts DESC LIMIT kproduces the largest k values, the plan showsSortExec(TopK)and the source is markedreversed: true.reverse_pushdown_rejected_for_unrelated_ordering—WITH ORDER (a ASC),query asks
ORDER BY b DESC. Plan keeps theSortand source is notreversed.
reverse_pushdown_no_ordering_declared— noWITH ORDER→ no reversepushdown.
reverse_pushdown_then_filter— filter pushdown still works alongsidereverse sort pushdown.
order_by_desc_limit_returns_largest_rows— end-to-end synergy: 5 files of100 rows each, filter +
ORDER BY ts DESC LIMIT 10returns ts[490..500]and the source is reversed in the plan.