Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5e0eab0
basic metastore integrationa and distributed planning
alexanderbianchi Feb 12, 2026
a7fa135
integrate quickwit with datafusion via Flight
alexanderbianchi Feb 13, 2026
5dd15c3
add basic type coercion
alexanderbianchi Feb 14, 2026
d3ac8c0
upgrade prost
alexanderbianchi Feb 14, 2026
bfc6492
fix: serialize pushed filters across workers, add StorageSplitOpener
alexanderbianchi Feb 14, 2026
f6662dd
feat: add QuickwitSchemaProvider catalog for lazy index resolution
alexanderbianchi Feb 14, 2026
2aeb96f
feat: pass tokenizer manager through StorageSplitOpener
alexanderbianchi Feb 14, 2026
d2fa808
fix: remove unnecessary RepartitionExec in join plans
alexanderbianchi Feb 14, 2026
85d306c
feat: rewrite integration test with real splits + storage + catalog
alexanderbianchi Feb 14, 2026
5923249
feat: add warmup for storage-backed splits, fix segment sizes
alexanderbianchi Feb 14, 2026
37a228f
fix: cache opened Index per split, warmup once not twice
alexanderbianchi Feb 14, 2026
6adb196
fix: targeted warmup per DataSource, no over-fetching
alexanderbianchi Feb 14, 2026
15491d7
feat: add demo binary showing all three query paths
alexanderbianchi Feb 18, 2026
0cb07e5
feat: QuickwitTableProvider uses UnifiedTantivyTableProvider
alexanderbianchi Feb 18, 2026
7c8a5ec
fix: print physical plans in demo 2 and demo 3
alexanderbianchi Feb 18, 2026
23697aa
fix: resolve all cargo warnings
alexanderbianchi Feb 18, 2026
85b9b55
feat: demo 2 uses single 'logs' table with distributed plan
alexanderbianchi Feb 18, 2026
69c60c4
fix: full_text filter pushdown through QuickwitTableProvider
alexanderbianchi Feb 18, 2026
5f3059f
fix: demo 3 uses distributed session and shows stage plan
alexanderbianchi Feb 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
477 changes: 477 additions & 0 deletions metastore-distributed-df-design.md

Large diffs are not rendered by default.

1,671 changes: 1,653 additions & 18 deletions quickwit/Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datafusion",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
Expand Down Expand Up @@ -47,6 +48,7 @@ default-members = [
"quickwit-common",
"quickwit-config",
"quickwit-control-plane",
"quickwit-datafusion",
"quickwit-datetime",
"quickwit-directories",
"quickwit-doc-mapper",
Expand Down Expand Up @@ -335,6 +337,7 @@ quickwit-codegen-example = { path = "quickwit-codegen/example" }
quickwit-common = { path = "quickwit-common" }
quickwit-config = { path = "quickwit-config" }
quickwit-control-plane = { path = "quickwit-control-plane" }
quickwit-datafusion = { path = "quickwit-datafusion" }
quickwit-datetime = { path = "quickwit-datetime" }
quickwit-directories = { path = "quickwit-directories" }
quickwit-doc-mapper = { path = "quickwit-doc-mapper" }
Expand Down Expand Up @@ -371,6 +374,9 @@ encoding_rs = "=0.8.35"
[patch.crates-io]
sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "085a4c7" }

[patch.'https://github.com/quickwit-oss/tantivy/']
tantivy = { path = "/Users/alex.bianchi/oss/tantivy/.worktrees/bianchi/tantivydf" }
Copy link
Author

@alexanderbianchi alexanderbianchi Feb 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course would not be merged like this, going to need to move around dependencies


## this patched version of tracing helps better understand what happens inside futures (when are
## they polled, how long does poll take...)
#tracing = { git = "https://github.com/trinity-1686a/tracing.git", rev = "6806cac3" }
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ pub struct SearcherConfig {
pub storage_timeout_policy: Option<StorageTimeoutPolicy>,
pub warmup_memory_budget: ByteSize,
pub warmup_single_split_initial_allocation: ByteSize,
/// Enable the experimental DataFusion SQL endpoint.
/// When enabled, `POST /api/v1/{index_id}/datafusion` accepts SQL
/// queries and returns Arrow IPC record batches.
#[serde(default)]
pub enable_datafusion_endpoint: bool,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -435,6 +440,7 @@ impl Default for SearcherConfig {
storage_timeout_policy: None,
warmup_memory_budget: ByteSize::gb(100),
warmup_single_split_initial_allocation: ByteSize::gb(1),
enable_datafusion_endpoint: false,
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions quickwit/quickwit-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[package]
name = "quickwit-datafusion"
description = "Distributed DataFusion execution for Quickwit"
version.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
dashmap = "6"
futures = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
url = "2"

tantivy = { workspace = true }
tantivy-datafusion = { path = "/Users/alex.bianchi/oss/tantivy/.worktrees/bianchi/tantivydf/tantivy-datafusion" }

quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-metastore = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-query = { workspace = true }
quickwit-search = { workspace = true }
quickwit-storage = { workspace = true }

datafusion = "52"
datafusion-datasource = "52"
datafusion-physical-plan = "52"
datafusion-proto = "52"
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed" }
arrow = { version = "57", features = ["prettyprint"] }
arrow-flight = "57"

[dev-dependencies]
datafusion-distributed = { git = "https://github.com/datafusion-contrib/datafusion-distributed", features = ["integration"] }
dashmap = "6"
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true }
quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-search = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true }
tokio = { workspace = true, features = ["test-util", "macros"] }
91 changes: 91 additions & 0 deletions quickwit/quickwit-datafusion/TODO-Datafusion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# TODO: Distributed DataFusion Execution

## Plan Quality

### Unnecessary RepartitionExec in join plans
The distributed plans show `RepartitionExec: Hash([_doc_id, _segment_ord], 3)` between every DataSource and its HashJoin. This is wrong — inv, f, and d within a single split are already co-partitioned by segment (they declare `Hash([_doc_id, _segment_ord], N)` partitioning). The optimizer shouldn't add shuffles for co-partitioned joins.

Likely cause: `target_partitions` on the distributed context doesn't match the segment count, so DF thinks it needs to repartition. With `target_partitions=1` locally the plan uses `CollectLeft` mode and no repartitions. Need to either:
- Set `target_partitions` = segment count on the distributed context
- Or teach the distributed planner to respect co-partitioned DataSource declarations

### Explicit UNION ALL vs plan-level decomposition
Currently each split's join plan is written as explicit SQL `UNION ALL`. This means the planner sees N copies of the same join pattern. A better approach:
- `QuickwitTableProvider` registers a single logical table backed by all splits
- The planner produces one join plan
- The distributed optimizer decomposes it by split at the physical level
- Each split's join subtree becomes a task assigned to a worker

This would let df-distributed's `DistributedPhysicalOptimizerRule` handle the split-to-worker mapping natively instead of the SQL manually encoding it.

### CollectLeft vs Partitioned join mode
With `target_partitions=1` the local plan uses `CollectLeft` (broadcast build side) which is correct for the inv⋈f pattern — the inverted index result is small. The distributed plan uses `Partitioned` mode because the context has higher target_partitions. Need to control this so the join strategy matches the data characteristics.

## Worker/Split/Segment Mapping

### How many workers per split? Per segment?
Currently: 1 worker per split. Each split may have multiple segments. Segments within a split are co-partitioned and joined locally on the worker.

Open questions:
- Should a split with many segments be split across multiple workers? (probably not — segments within a split share the same index and need co-partitioned joins)
- Should multiple small splits be assigned to the same worker? (yes — df-distributed's task assignment should batch them)
- How does the coordinator know how many workers are available? (WorkerResolver::get_urls() — need to implement for Quickwit cluster)

### Worker discovery
`start_localhost_context` hardcodes localhost workers. Production needs:
- `WorkerResolver` backed by Quickwit's cluster membership (Chitchat)
- Workers = Quickwit searcher nodes
- Split-to-worker affinity based on cache locality

## Metastore Integration

### QuickwitTableProvider doesn't query the metastore
Currently takes a hand-built `Vec<Arc<SplitIndexOpener>>`. Production path:
- `QuickwitTableProvider::new(metastore, index_id)`
- At `scan()` time, call `metastore.list_splits()` to discover splits
- Create `SplitIndexOpener` per split from split metadata
- Build per-split tantivy-df providers and compose the plan

### SplitIndexOpener doesn't open real splits
Currently backed by an in-memory `DashMap<String, Index>`. Production path:
- `open()` calls `open_index_with_caches()` from quickwit-search
- Downloads split bundle from object storage (S3/GCS/Azure)
- Opens tantivy index from the local cache or downloaded bundle
- Returns the opened `Index`

## Codec Gaps

### Pushed filters lost across serialization
`FastFieldDataSource` claims `PushedDown::Yes` for all filters, so DF removes the `FilterExec`. But the codec doesn't serialize pushed filters (they're `Arc<dyn PhysicalExpr>`, not trivially serializable). On the worker, the reconstructed DataSource has no filters.

Options:
- Serialize pushed filters via DF's PhysicalExpr proto support
- Change tantivy-df to return `PushedDown::No` so DF keeps FilterExec in the plan (it serializes fine as a built-in node)
- Encode the filter expressions as Expr in the proto and pass to scan()

### Aggregation pushdown not tested in distributed
tantivy-df has `AggPushdown` that replaces `AggregateExec` with `TantivyAggregateExec`. This is a custom ExecutionPlan node that the codec doesn't handle yet. Need to:
- Add `TantivyAggregateExec` encoding to `TantivyCodec`
- Or make it a DataSource variant the existing codec pattern handles
- Test partial aggregation on workers + final merge on coordinator

## Optimizer Rules in Distributed Context

### tantivy-df optimizer rules not registered on workers
`FastFieldFilterPushdown`, `TopKPushdown`, `AggPushdown`, `OrdinalGroupByOptimization` are registered on the coordinator's session but not on workers. Workers rebuild the plan from the codec, so they'd need these rules if the plan is further optimized on the worker side.

Currently this doesn't matter because the plan is fully optimized on the coordinator before distribution. But if df-distributed ever does worker-side re-optimization, the rules need to be registered in `build_worker_session_builder`.

## Testing

### Multi-segment splits
All current tests use single-segment splits (one commit). Need tests with multi-segment splits to verify:
- Segment-level co-partitioning in joins (N partitions per split)
- Correct partition mapping across inv/f/d providers
- Chunking behavior with target_partitions > segment count

### Real storage backend
All tests use `Index::create_in_ram`. Need integration tests with:
- Split bundles on local filesystem
- `open_index_with_caches()` in the opener
- Split download + cache warming
Loading