-
Notifications
You must be signed in to change notification settings - Fork 520
QuickwitDatafusion #6160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
alexanderbianchi
wants to merge
19
commits into
quickwit-oss:main
Choose a base branch
from
alexanderbianchi:bianchi/quickwitdf
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
QuickwitDatafusion #6160
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
5e0eab0
basic metastore integrationa and distributed planning
alexanderbianchi a7fa135
integrate quickwit with datafusion via Flight
alexanderbianchi 5dd15c3
add basic type coercion
alexanderbianchi d3ac8c0
upgrade prost
alexanderbianchi bfc6492
fix: serialize pushed filters across workers, add StorageSplitOpener
alexanderbianchi f6662dd
feat: add QuickwitSchemaProvider catalog for lazy index resolution
alexanderbianchi 2aeb96f
feat: pass tokenizer manager through StorageSplitOpener
alexanderbianchi d2fa808
fix: remove unnecessary RepartitionExec in join plans
alexanderbianchi 85d306c
feat: rewrite integration test with real splits + storage + catalog
alexanderbianchi 5923249
feat: add warmup for storage-backed splits, fix segment sizes
alexanderbianchi 37a228f
fix: cache opened Index per split, warmup once not twice
alexanderbianchi 6adb196
fix: targeted warmup per DataSource, no over-fetching
alexanderbianchi 15491d7
feat: add demo binary showing all three query paths
alexanderbianchi 0cb07e5
feat: QuickwitTableProvider uses UnifiedTantivyTableProvider
alexanderbianchi 7c8a5ec
fix: print physical plans in demo 2 and demo 3
alexanderbianchi 23697aa
fix: resolve all cargo warnings
alexanderbianchi 85b9b55
feat: demo 2 uses single 'logs' table with distributed plan
alexanderbianchi 69c60c4
fix: full_text filter pushdown through QuickwitTableProvider
alexanderbianchi 5f3059f
fix: demo 3 uses distributed session and shows stage plan
alexanderbianchi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| [package] | ||
| name = "quickwit-datafusion" | ||
| description = "Distributed DataFusion execution for Quickwit" | ||
| version.workspace = true | ||
| edition.workspace = true | ||
| license.workspace = true | ||
|
|
||
| [dependencies] | ||
| anyhow = { workspace = true } | ||
| async-trait = { workspace = true } | ||
| dashmap = "6" | ||
| futures = { workspace = true } | ||
| prost = { workspace = true } | ||
| serde = { workspace = true } | ||
| serde_json = { workspace = true } | ||
| tokio = { workspace = true } | ||
| tokio-stream = { workspace = true } | ||
| tonic = { workspace = true } | ||
| tracing = { workspace = true } | ||
| url = "2" | ||
|
|
||
| tantivy = { workspace = true } | ||
| tantivy-datafusion = { path = "/Users/alex.bianchi/oss/tantivy/.worktrees/bianchi/tantivydf/tantivy-datafusion" } | ||
|
|
||
| quickwit-common = { workspace = true } | ||
| quickwit-config = { workspace = true } | ||
| quickwit-doc-mapper = { workspace = true } | ||
| quickwit-metastore = { workspace = true } | ||
| quickwit-proto = { workspace = true } | ||
| quickwit-query = { workspace = true } | ||
| quickwit-search = { workspace = true } | ||
| quickwit-storage = { workspace = true } | ||
|
|
||
| datafusion = "52" | ||
| datafusion-datasource = "52" | ||
| datafusion-physical-plan = "52" | ||
| datafusion-proto = "52" | ||
| datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed" } | ||
| arrow = { version = "57", features = ["prettyprint"] } | ||
| arrow-flight = "57" | ||
|
|
||
| [dev-dependencies] | ||
| datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", features = ["integration"] } | ||
| dashmap = "6" | ||
| quickwit-common = { workspace = true, features = ["testsuite"] } | ||
| quickwit-config = { workspace = true } | ||
| quickwit-indexing = { workspace = true, features = ["testsuite"] } | ||
| quickwit-proto = { workspace = true, features = ["testsuite"] } | ||
| quickwit-metastore = { workspace = true, features = ["testsuite"] } | ||
| quickwit-search = { workspace = true, features = ["testsuite"] } | ||
| quickwit-storage = { workspace = true } | ||
| tokio = { workspace = true, features = ["test-util", "macros"] } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| # TODO: Distributed DataFusion Execution | ||
|
|
||
| ## Plan Quality | ||
|
|
||
| ### Unnecessary RepartitionExec in join plans | ||
| The distributed plans show `RepartitionExec: Hash([_doc_id, _segment_ord], 3)` between every DataSource and its HashJoin. This is wrong — inv, f, and d within a single split are already co-partitioned by segment (they declare `Hash([_doc_id, _segment_ord], N)` partitioning). The optimizer shouldn't add shuffles for co-partitioned joins. | ||
|
|
||
| Likely cause: `target_partitions` on the distributed context doesn't match the segment count, so DF thinks it needs to repartition. With `target_partitions=1` locally the plan uses `CollectLeft` mode and no repartitions. Need to either: | ||
| - Set `target_partitions` = segment count on the distributed context | ||
| - Or teach the distributed planner to respect co-partitioned DataSource declarations | ||
|
|
||
| ### Explicit UNION ALL vs plan-level decomposition | ||
| Currently each split's join plan is written as explicit SQL `UNION ALL`. This means the planner sees N copies of the same join pattern. A better approach: | ||
| - `QuickwitTableProvider` registers a single logical table backed by all splits | ||
| - The planner produces one join plan | ||
| - The distributed optimizer decomposes it by split at the physical level | ||
| - Each split's join subtree becomes a task assigned to a worker | ||
|
|
||
| This would let df-distributed's `DistributedPhysicalOptimizerRule` handle the split-to-worker mapping natively instead of the SQL manually encoding it. | ||
|
|
||
| ### CollectLeft vs Partitioned join mode | ||
| With `target_partitions=1` the local plan uses `CollectLeft` (broadcast build side) which is correct for the inv⋈f pattern — the inverted index result is small. The distributed plan uses `Partitioned` mode because the context has higher target_partitions. Need to control this so the join strategy matches the data characteristics. | ||
|
|
||
| ## Worker/Split/Segment Mapping | ||
|
|
||
| ### How many workers per split? Per segment? | ||
| Currently: 1 worker per split. Each split may have multiple segments. Segments within a split are co-partitioned and joined locally on the worker. | ||
|
|
||
| Open questions: | ||
| - Should a split with many segments be split across multiple workers? (probably not — segments within a split share the same index and need co-partitioned joins) | ||
| - Should multiple small splits be assigned to the same worker? (yes — df-distributed's task assignment should batch them) | ||
| - How does the coordinator know how many workers are available? (WorkerResolver::get_urls() — need to implement for Quickwit cluster) | ||
|
|
||
| ### Worker discovery | ||
| `start_localhost_context` hardcodes localhost workers. Production needs: | ||
| - `WorkerResolver` backed by Quickwit's cluster membership (Chitchat) | ||
| - Workers = Quickwit searcher nodes | ||
| - Split-to-worker affinity based on cache locality | ||
|
|
||
| ## Metastore Integration | ||
|
|
||
| ### QuickwitTableProvider doesn't query the metastore | ||
| Currently takes a hand-built `Vec<Arc<SplitIndexOpener>>`. Production path: | ||
| - `QuickwitTableProvider::new(metastore, index_id)` | ||
| - At `scan()` time, call `metastore.list_splits()` to discover splits | ||
| - Create `SplitIndexOpener` per split from split metadata | ||
| - Build per-split tantivy-df providers and compose the plan | ||
|
|
||
| ### SplitIndexOpener doesn't open real splits | ||
| Currently backed by an in-memory `DashMap<String, Index>`. Production path: | ||
| - `open()` calls `open_index_with_caches()` from quickwit-search | ||
| - Downloads split bundle from object storage (S3/GCS/Azure) | ||
| - Opens tantivy index from the local cache or downloaded bundle | ||
| - Returns the opened `Index` | ||
|
|
||
| ## Codec Gaps | ||
|
|
||
| ### Pushed filters lost across serialization | ||
| `FastFieldDataSource` claims `PushedDown::Yes` for all filters, so DF removes the `FilterExec`. But the codec doesn't serialize pushed filters (they're `Arc<dyn PhysicalExpr>`, not trivially serializable). On the worker, the reconstructed DataSource has no filters. | ||
|
|
||
| Options: | ||
| - Serialize pushed filters via DF's PhysicalExpr proto support | ||
| - Change tantivy-df to return `PushedDown::No` so DF keeps FilterExec in the plan (it serializes fine as a built-in node) | ||
| - Encode the filter expressions as Expr in the proto and pass to scan() | ||
|
|
||
| ### Aggregation pushdown not tested in distributed | ||
| tantivy-df has `AggPushdown` that replaces `AggregateExec` with `TantivyAggregateExec`. This is a custom ExecutionPlan node that the codec doesn't handle yet. Need to: | ||
| - Add `TantivyAggregateExec` encoding to `TantivyCodec` | ||
| - Or make it a DataSource variant the existing codec pattern handles | ||
| - Test partial aggregation on workers + final merge on coordinator | ||
|
|
||
| ## Optimizer Rules in Distributed Context | ||
|
|
||
| ### tantivy-df optimizer rules not registered on workers | ||
| `FastFieldFilterPushdown`, `TopKPushdown`, `AggPushdown`, `OrdinalGroupByOptimization` are registered on the coordinator's session but not on workers. Workers rebuild the plan from the codec, so they'd need these rules if the plan is further optimized on the worker side. | ||
|
|
||
| Currently this doesn't matter because the plan is fully optimized on the coordinator before distribution. But if df-distributed ever does worker-side re-optimization, the rules need to be registered in `build_worker_session_builder`. | ||
|
|
||
| ## Testing | ||
|
|
||
| ### Multi-segment splits | ||
| All current tests use single-segment splits (one commit). Need tests with multi-segment splits to verify: | ||
| - Segment-level co-partitioning in joins (N partitions per split) | ||
| - Correct partition mapping across inv/f/d providers | ||
| - Chunking behavior with target_partitions > segment count | ||
|
|
||
| ### Real storage backend | ||
| All tests use `Index::create_in_ram`. Need integration tests with: | ||
| - Split bundles on local filesystem | ||
| - `open_index_with_caches()` in the opener | ||
| - Split download + cache warming |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course would not be merged like this, going to need to move around dependencies