-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Consolidate filters and projections onto TableScan
#20061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| // Wrap with ProjectionExec if projection is present and differs from scan output | ||
| // (either non-identity, or fewer columns due to filter-only columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea for #19387 is that we might be able to push down trivial expressions here, thus avoiding the need for any physical optimizer changes/rules.
| LogicalPlan::Filter(filter) => { | ||
| // Split AND predicates into individual expressions | ||
| filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can drop this since filters are effectively pushed into TableScan now?
c4b139d to
d0ff08f
Compare
538fafe to
37d40e8
Compare
37d40e8 to
0166985
Compare
|
@kosiew would you be open to reviewing this? |
kosiew
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found an issue and stopping as there are other CI issues.
| LogicalPlan::TableScan(scan) => { | ||
| // Also extract filters from TableScan (where they may be pushed down) | ||
| filters.extend(scan.filters.iter().cloned()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be a problem in scenarios like UPDATE target FROM source ... where the input plan contains TableScan nodes for both target and source. This function will extract filters from both tables and attempt to apply them to the target table (after stripping qualifiers).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm interesting. Is the problem the stripping of qualifiers? It seems to me that the current implementation is generally fragile: we should be taking into account the join aspect and only collect from subtrees of the join. In particular:
> explain format indent UPDATE "trg" SET col = 1 FROM src WHERE trg.id = src.id AND src.type = 'active' AND trg.id > 100;
+---------------+-----------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------+
| logical_plan | Dml: op=[Update] table=[trg] |
| | Projection: trg.id AS id, Utf8View("1") AS col |
| | Inner Join: trg.id = src.id |
| | Filter: trg.id > Int32(100) |
| | TableScan: trg projection=[id] |
| | Projection: src.id |
| | Filter: src.type = Utf8View("active") AND src.id > Int32(100) |
| | TableScan: src projection=[id, type] |
It seems to me that we should do something like:
- Find the table scan corresponding to the table being updated.
- Walk up the tree until we hit a join / subquery (?) / other blocker.
- Collect all filters in that subtree (ideally once this PR is across the line they've all been pushed into the
TableScanso that becomes trivial)
I don't know how the DML stuff is supposed to handle more complex cases involving EXISTS, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how the DML stuff is supposed to handle more complex cases involving
EXISTS, etc.
E.g.:
-- 1. Setup: Create tables
CREATE TEMP TABLE departments (
dept_id INT,
dept_name TEXT,
is_active BOOLEAN
);
CREATE TEMP TABLE employees (
emp_id SERIAL PRIMARY KEY,
name TEXT,
dept_name TEXT,
salary INT
);
-- 2. Seed Data
INSERT INTO departments VALUES (1, 'Engineering', true), (2, 'Marketing', false);
INSERT INTO employees (name, dept_name, salary) VALUES
('Alice', 'Engineering', 5000),
('Bob', 'Engineering', 5000),
('Charlie', 'Marketing', 4000);
-- 3. Execute the UPDATE with a subquery in the WHERE clause
-- Goal: Give a 10% raise only to people in the 'Engineering' department
UPDATE employees
SET salary = salary * 1.10
WHERE dept_name = (
SELECT dept_name
FROM departments
WHERE dept_id = 1
LIMIT 1
);
-- 4. Verify the results
SELECT * FROM employees;This seems to work in postgres.
But again I'm not sure if this is directly something this PR should handle.
Thank you, I will look into this. I'm struggling with the CI issue: https://github.com/apache/datafusion/actions/runs/21495116091/job/61927800829?pr=20061#step:4:7993 It seems like the only diff is the inclusion of a backtrace. I can reproduce locally if I run with |
Fixed now. The issue was that we can't push |
|
@ethan-tyler any chance you can take a look at this as well? |
This PR changes `TableScan.projection` from `Option<Vec<usize>>` (column indices) to `Option<Vec<Expr>>` (column expressions). This is the first step toward consolidating projection and filter expressions directly in the TableScan node. ## Motivation Currently, TableScan stores projections as column indices which requires constant conversion between indices and expressions throughout the codebase. By storing expressions directly, we: 1. Simplify the data model - projections are naturally expressions 2. Enable future consolidation of filter expressions into projections 3. Reduce conversion overhead in optimization passes This is the first of two PRs splitting PR apache#20061, which consolidates both projections and filters. This PR focuses solely on the projection type change to make the refactoring easier to review. ## Changes - Changed `TableScan.projection` type from `Option<Vec<usize>>` to `Option<Vec<Expr>>` - Added `TableScanBuilder` for constructing TableScan nodes with expression-based projections - Added `projection_indices_from_exprs()` helper to convert expressions back to indices when needed - Updated `TableScan::try_new()` to accept indices (for backward compatibility) and convert them to expressions internally - Updated optimize_projections, physical_planner, proto serialization, substrait conversion, and SQL unparser to work with the new type ## Related Issues / PRs - Split from apache#20061 - Enables future work on filter consolidation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
ethan-tyler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!! Moving scan semantics out of the tree and letting physical planning own the pushdown decisions makes a lot of sense as a direction.
I did find a few things that I think need fixing and some smaller stuff worth looking at.
| .iter() | ||
| .chain(new_scan_filters) | ||
| .chain(pushable_filters) | ||
| .unique() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This refactor changes what reaches .unique(). Volatile predicates can now land in TableScan.filters, and dedup collapses repeated evaluation. random() < 0.5 AND random() < 0.5 is not equivalent to random() < 0.5. I would either drop the dedup, or restrict it to !expr.is_volatile() and keep volatiles in order.
| /// Column qualifiers are stripped so expressions can be evaluated against | ||
| /// the TableProvider's schema. | ||
| /// | ||
| fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In UPDATE t1 SET ... FROM t2 WHERE t2.x = 1 AND t1.id = t2.id, t2.x becomes x and can accidentally match t1.x; join predicates degrade into id = id. I would only pass predicates that are provably evaluable against the target table schema alone before stripping qualifiers and keep the rest outside the provider call.
| /// over the source table. This function extracts column name and expression pairs | ||
| /// from the projection. Column qualifiers are stripped from the expressions. | ||
| /// | ||
| fn extract_update_assignments(input: &Arc<LogicalPlan>) -> Result<Vec<(String, Expr)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same qualifier-stripping problem is happening here. extract_update_assignments strips qualifiers from the RHS, SET b = t2.b can become b = b and get treated as an identity assignment.
| if indices.is_empty() { | ||
| Ok(None) | ||
| } else { | ||
| Ok(Some(indices)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the required column set is empty (e.g. SELECT 1 FROM wide_table), this returns None which makes it scan all columns. IMO this is the opposite of what we want and would return Some(vec![]), or a single cheap column if empty projections aren't provider safe.
| @@ -1023,11 +1023,10 @@ impl AsLogicalPlan for LogicalPlanNode { | |||
|
|
|||
| let projection = match projection { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Projections are serialized as column name strings and deserialized by schema index lookup. Non-column expressions won't survive the roundtrip, and even column names with dots or quoting differences are fragile. I think the cleanest path is enforcing column-ref-only projections in the builder and failing fast on anything else. Extending proto to Vec<LogicalExprNode> is the alternative but a bigger lift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we need here is to extend the proto to Vec<LogicalExprNode>...
|
|
||
| let mut builder = LogicalPlanBuilder::scan( | ||
| // Use TableScanBuilder to preserve full projection expressions | ||
| let mut scan_builder = TableScanBuilder::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This builder chain doesn't carry over table_scan.fetch, it needs a .with_fetch(table_scan.fetch) or plans with fetch pushed into the scan will lose LIMIT in the generated SQL.
| // Convert column names to indices | ||
| let mut column_indices: Vec<usize> = required_columns | ||
| .iter() | ||
| .filter_map(|col| table_schema.index_of(col.name()).ok()) | ||
| .collect(); | ||
| column_indices.sort(); | ||
| column_indices.dedup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Field references are built against the full table DFSchema but the ReadRel uses a column subset mask. If expression projections stay, the indices won't line up. Goes away if projections are restricted to column refs.
Related issues
Closes #19894. I think this will also help with #19387 as well.