-
Notifications
You must be signed in to change notification settings - Fork 1.9k
refactor: Change TableScan.projection from indices to expressions #20091
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This PR changes `TableScan.projection` from `Option<Vec<usize>>` (column indices) to `Option<Vec<Expr>>` (column expressions). This is the first step toward consolidating projection and filter expressions directly in the TableScan node. ## Motivation Currently, TableScan stores projections as column indices which requires constant conversion between indices and expressions throughout the codebase. By storing expressions directly, we: 1. Simplify the data model - projections are naturally expressions 2. Enable future consolidation of filter expressions into projections 3. Reduce conversion overhead in optimization passes This is the first of two PRs splitting PR apache#20061, which consolidates both projections and filters. This PR focuses solely on the projection type change to make the refactoring easier to review. ## Changes - Changed `TableScan.projection` type from `Option<Vec<usize>>` to `Option<Vec<Expr>>` - Added `TableScanBuilder` for constructing TableScan nodes with expression-based projections - Added `projection_indices_from_exprs()` helper to convert expressions back to indices when needed - Updated `TableScan::try_new()` to accept indices (for backward compatibility) and convert them to expressions internally - Updated optimize_projections, physical_planner, proto serialization, substrait conversion, and SQL unparser to work with the new type ## Related Issues / PRs - Split from apache#20061 - Enables future work on filter consolidation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…dices Change the return type from Option<Vec<usize>> to (Option<Vec<Expr>>, Vec<usize>) where: - First element is the remainder projection to apply on top of the scan output (None if projection is all simple column references) - Second element is the column indices to scan For pure column projections, indices preserve projection order (including duplicates) so no remainder projection is needed. For expression projections, indices are deduplicated/sorted for efficiency, and the original expressions are returned as remainder. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Remove separate is_simple_column_projection check and instead use a single loop that tracks whether any complex expressions are encountered via a bool flag. At the end, use the flag to decide whether to return Some or None for the remainder projection. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| /// This builder provides a flexible way to construct `TableScan` nodes, | ||
| /// particularly when working with expression-based projections directly. | ||
| #[derive(Clone)] | ||
| pub struct TableScanBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of refactor that while not strictly necessary for this PR I think will be helpful for future (and makes the code in this PR nicer).
| /// # Returns | ||
| /// * `Some(Vec<usize>)` - If all expressions are column references found in the schema | ||
| /// * `None` - If any expression is not a column reference or not found in schema | ||
| pub fn projection_indices_from_exprs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super sold on this function. I feel there is a footgun where you don't know if None means complex expressions (maybe you forgot to check for them?) or if None means a column was not found in the schema (also something that maybe should never happen?).
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self review
| session_state, | ||
| )?; | ||
| } else { | ||
| // Mixed: push non-async expressions + columns needed by async, keep async on top |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to make sure we have test coverage that hits this. Generally we should run a full coverage check.
| // Wrap with ProjectionExec if remainder projection is needed | ||
| if let Some(ref proj_exprs) = remainder_projection { | ||
| let scan_output_schema = plan.schema(); | ||
| let scan_df_schema = DFSchema::try_from(scan_output_schema.as_ref().clone())?; | ||
| let unnormalized_proj_exprs: Vec<Expr> = | ||
| unnormalize_cols(proj_exprs.iter().cloned()); | ||
|
|
||
| // Classify expressions as async or non-async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if there's a better way to handle this. Async UDFs seem to be a bit of a pain. I wonder if there's some existing helpers, etc. If not I think I can also encapsulate this pattern of "split the projection into a top remainder and inner one that matches a closure" since I see us using it a lot.
| let num_input_columns = input.schema().fields().len(); | ||
| let input_schema = input.schema(); | ||
|
|
||
| match self.try_plan_async_exprs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to investigate how try_plan_async_exprs and how it overlaps with the projection splitting above.
| col.name.clone() | ||
| } else { | ||
| e.schema_name().to_string() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is in the Display impl
| @@ -2807,14 +2829,129 @@ impl TableScan { | |||
| Ok(Self { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we could ruse TableScanBuilder here?
| let projection_exec = | ||
| ProjectionExec::try_new(proj_exprs, Arc::clone(&plan))?; | ||
|
|
||
| match plan.try_swapping_with_projection(&projection_exec)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might move this out of this PR. The idea is that once we have projections in TableScan then the projection pushdown optimizer can handle pushing down get_field expressions all the way into TableScan (past any filters) and then we can here push those into the scan itself (if the scan accepts them).
|
@ethan-tyler this is another PR of interest for the |
Motivation
This PR was split from #20061 (which consolidates both projections and filters) and enables future work on #19538 and #19387.
The idea here is that:
get_field(...)expressions into TableScan and then when we do logical -> physical planning we can immediately calltry_swapping_with_projectionon theExecutionPlanthatTableProvider::scan_with_argsreturns. Thus ensuring we push down theget_field(...)projections into the physical scan without needing a phyiscal optimizer rule that is also aware of expression placement, is implemented for each execuiton plan, etc.Changes
Core Type Change
TableScan.projectionfromOption<Vec<usize>>toOption<Vec<Expr>>Expr::ColumnreferenceNew APIs
TableScanBuilderfor constructingTableScannodes with expression-based projections directlyprojection_indices_from_exprs()helper inutils.rsto convert expressions back to indices when needed (for physical planning and serialization)Backward Compatibility
TableScan::try_new()still acceptsOption<Vec<usize>>and converts indices to expressions internallyLogicalPlanBuilder::scan*methods still accept indices for backward compatibilityUpdated Components
TableScanBuilderScanArgsTest Plan
cargo checkpasses for all affected cratescargo test -p datafusion-exprpassescargo test -p datafusion-optimizerpasses🤖 Generated with Claude Code