Skip to content

Conversation

@adriangb
Copy link
Contributor

Related issues

Closes #19894. I think this will also help with #19387 as well.

@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate proto Related to proto crate labels Jan 29, 2026
Comment on lines +1757 to +1758
// Wrap with ProjectionExec if projection is present and differs from scan output
// (either non-identity, or fewer columns due to filter-only columns)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea for #19387 is that we might be able to push down trivial expressions here, thus avoiding the need for any physical optimizer changes/rules.

Comment on lines +2166 to +2170
LogicalPlan::Filter(filter) => {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can drop this since filters are effectively pushed into TableScan now?

@github-actions github-actions bot added the documentation Improvements or additions to documentation label Jan 29, 2026
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Jan 29, 2026
@adriangb adriangb marked this pull request as ready for review January 29, 2026 21:21
@adriangb
Copy link
Contributor Author

@kosiew would you be open to reviewing this?

Copy link
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found an issue and stopping as there are other CI issues.

Comment on lines +2123 to +2125
LogicalPlan::TableScan(scan) => {
// Also extract filters from TableScan (where they may be pushed down)
filters.extend(scan.filters.iter().cloned());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be a problem in scenarios like UPDATE target FROM source ... where the input plan contains TableScan nodes for both target and source. This function will extract filters from both tables and attempt to apply them to the target table (after stripping qualifiers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm interesting. Is the problem the stripping of qualifiers? It seems to me that the current implementation is generally fragile: we should be taking into account the join aspect and only collect from subtrees of the join. In particular:

> explain format indent UPDATE "trg" SET col = 1 FROM src WHERE trg.id = src.id AND src.type = 'active' AND trg.id > 100;
+---------------+-----------------------------------------------------------------------+
| plan_type     | plan                                                                  |
+---------------+-----------------------------------------------------------------------+
| logical_plan  | Dml: op=[Update] table=[trg]                                          |
|               |   Projection: trg.id AS id, Utf8View("1") AS col                      |
|               |     Inner Join: trg.id = src.id                                       |
|               |       Filter: trg.id > Int32(100)                                     |
|               |         TableScan: trg projection=[id]                                |
|               |       Projection: src.id                                              |
|               |         Filter: src.type = Utf8View("active") AND src.id > Int32(100) |
|               |           TableScan: src projection=[id, type]                        |

It seems to me that we should do something like:

  1. Find the table scan corresponding to the table being updated.
  2. Walk up the tree until we hit a join / subquery (?) / other blocker.
  3. Collect all filters in that subtree (ideally once this PR is across the line they've all been pushed into the TableScan so that becomes trivial)

I don't know how the DML stuff is supposed to handle more complex cases involving EXISTS, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how the DML stuff is supposed to handle more complex cases involving EXISTS, etc.

E.g.:

-- 1. Setup: Create tables
CREATE TEMP TABLE departments (
    dept_id INT,
    dept_name TEXT,
    is_active BOOLEAN
);

CREATE TEMP TABLE employees (
    emp_id SERIAL PRIMARY KEY,
    name TEXT,
    dept_name TEXT,
    salary INT
);

-- 2. Seed Data
INSERT INTO departments VALUES (1, 'Engineering', true), (2, 'Marketing', false);

INSERT INTO employees (name, dept_name, salary) VALUES 
('Alice', 'Engineering', 5000),
('Bob', 'Engineering', 5000),
('Charlie', 'Marketing', 4000);

-- 3. Execute the UPDATE with a subquery in the WHERE clause
-- Goal: Give a 10% raise only to people in the 'Engineering' department
UPDATE employees 
SET salary = salary * 1.10
WHERE dept_name = (
    SELECT dept_name 
    FROM departments 
    WHERE dept_id = 1 
    LIMIT 1
);

-- 4. Verify the results
SELECT * FROM employees;

This seems to work in postgres.

But again I'm not sure if this is directly something this PR should handle.

@adriangb
Copy link
Contributor Author

Found an issue and stopping as there are other CI issues.

Thank you, I will look into this.

I'm struggling with the CI issue: https://github.com/apache/datafusion/actions/runs/21495116091/job/61927800829?pr=20061#step:4:7993

It seems like the only diff is the inclusion of a backtrace. I can reproduce locally if I run with RUST_BACKTRACE=1 but without it no failure. Which is confusing...

@adriangb
Copy link
Contributor Author

Found an issue and stopping as there are other CI issues.

Thank you, I will look into this.

I'm struggling with the CI issue: https://github.com/apache/datafusion/actions/runs/21495116091/job/61927800829?pr=20061#step:4:7993

It seems like the only diff is the inclusion of a backtrace. I can reproduce locally if I run with RUST_BACKTRACE=1 but without it no failure. Which is confusing...

Fixed now. The issue was that we can't push Expr::ScalarSubquery into TableScan.

@adriangb
Copy link
Contributor Author

@ethan-tyler any chance you can take a look at this as well?

adriangb added a commit to pydantic/datafusion that referenced this pull request Jan 31, 2026
This PR changes `TableScan.projection` from `Option<Vec<usize>>` (column
indices) to `Option<Vec<Expr>>` (column expressions). This is the first
step toward consolidating projection and filter expressions directly in
the TableScan node.

## Motivation

Currently, TableScan stores projections as column indices which requires
constant conversion between indices and expressions throughout the
codebase. By storing expressions directly, we:

1. Simplify the data model - projections are naturally expressions
2. Enable future consolidation of filter expressions into projections
3. Reduce conversion overhead in optimization passes

This is the first of two PRs splitting PR apache#20061, which consolidates
both projections and filters. This PR focuses solely on the projection
type change to make the refactoring easier to review.

## Changes

- Changed `TableScan.projection` type from `Option<Vec<usize>>` to
  `Option<Vec<Expr>>`
- Added `TableScanBuilder` for constructing TableScan nodes with
  expression-based projections
- Added `projection_indices_from_exprs()` helper to convert expressions
  back to indices when needed
- Updated `TableScan::try_new()` to accept indices (for backward
  compatibility) and convert them to expressions internally
- Updated optimize_projections, physical_planner, proto serialization,
  substrait conversion, and SQL unparser to work with the new type

## Related Issues / PRs

- Split from apache#20061
- Enables future work on filter consolidation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Contributor

@ethan-tyler ethan-tyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!! Moving scan semantics out of the tree and letting physical planning own the pushdown decisions makes a lot of sense as a direction.

I did find a few things that I think need fixing and some smaller stuff worth looking at.

.iter()
.chain(new_scan_filters)
.chain(pushable_filters)
.unique()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor changes what reaches .unique(). Volatile predicates can now land in TableScan.filters, and dedup collapses repeated evaluation. random() < 0.5 AND random() < 0.5 is not equivalent to random() < 0.5. I would either drop the dedup, or restrict it to !expr.is_volatile() and keep volatiles in order.

/// Column qualifiers are stripped so expressions can be evaluated against
/// the TableProvider's schema.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In UPDATE t1 SET ... FROM t2 WHERE t2.x = 1 AND t1.id = t2.id, t2.x becomes x and can accidentally match t1.x; join predicates degrade into id = id. I would only pass predicates that are provably evaluable against the target table schema alone before stripping qualifiers and keep the rest outside the provider call.

/// over the source table. This function extracts column name and expression pairs
/// from the projection. Column qualifiers are stripped from the expressions.
///
fn extract_update_assignments(input: &Arc<LogicalPlan>) -> Result<Vec<(String, Expr)>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same qualifier-stripping problem is happening here. extract_update_assignments strips qualifiers from the RHS, SET b = t2.b can become b = b and get treated as an identity assignment.

Comment on lines +1798 to +1803
if indices.is_empty() {
Ok(None)
} else {
Ok(Some(indices))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the required column set is empty (e.g. SELECT 1 FROM wide_table), this returns None which makes it scan all columns. IMO this is the opposite of what we want and would return Some(vec![]), or a single cheap column if empty projections aren't provider safe.

@@ -1023,11 +1023,10 @@ impl AsLogicalPlan for LogicalPlanNode {

let projection = match projection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Projections are serialized as column name strings and deserialized by schema index lookup. Non-column expressions won't survive the roundtrip, and even column names with dots or quoting differences are fragile. I think the cleanest path is enforcing column-ref-only projections in the builder and failing fast on anything else. Extending proto to Vec<LogicalExprNode> is the alternative but a bigger lift.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we need here is to extend the proto to Vec<LogicalExprNode>...


let mut builder = LogicalPlanBuilder::scan(
// Use TableScanBuilder to preserve full projection expressions
let mut scan_builder = TableScanBuilder::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This builder chain doesn't carry over table_scan.fetch, it needs a .with_fetch(table_scan.fetch) or plans with fetch pushed into the scan will lose LIMIT in the generated SQL.

Comment on lines +170 to +176
// Convert column names to indices
let mut column_indices: Vec<usize> = required_columns
.iter()
.filter_map(|col| table_schema.index_of(col.name()).ok())
.collect();
column_indices.sort();
column_indices.dedup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field references are built against the full table DFSchema but the ReadRel uses a column subset mask. If expression projections stay, the indices won't line up. Goes away if projections are restricted to column refs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions optimizer Optimizer rules proto Related to proto crate sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Unified TableScan.filters

3 participants