Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 105 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::utils::{split_conjunction, split_projection};
use datafusion_expr::{
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
Expand Down Expand Up @@ -455,25 +455,8 @@ impl DefaultPhysicalPlanner {
) -> Result<Arc<dyn ExecutionPlan>> {
let exec_node: Arc<dyn ExecutionPlan> = match node {
// Leaves (no children)
LogicalPlan::TableScan(TableScan {
source,
projection,
filters,
fetch,
..
}) => {
let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let filters_vec = filters.into_iter().collect::<Vec<_>>();
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
LogicalPlan::TableScan(scan) => {
self.plan_table_scan(scan, session_state).await?
}
LogicalPlan::Values(Values { values, schema }) => {
let exprs = values
Expand Down Expand Up @@ -1725,6 +1708,108 @@ impl DefaultPhysicalPlanner {
))
}
}

/// Plan a TableScan node, wrapping with ProjectionExec as needed.
///
/// This method handles projection pushdown by:
/// 1. Computing which columns the scan needs to produce
/// 2. Creating the scan with minimal required columns
/// 3. Applying any remainder projection (for complex expressions)
async fn plan_table_scan(
&self,
scan: &TableScan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let provider = source_as_provider(&scan.source)?;
let source_schema = scan.source.schema();

// Remove qualifiers from filters
let filters: Vec<Expr> = unnormalize_cols(scan.filters.iter().cloned());

// Compute required column indices and remainder projection
let split = split_projection(&scan.projection, &source_schema)?;

// Create the scan
let scan_args = ScanArgs::default()
.with_projection(split.column_indices.as_deref())
.with_filters(if filters.is_empty() {
None
} else {
Some(&filters)
})
.with_limit(scan.fetch);

let scan_result = provider.scan_with_args(session_state, scan_args).await?;
let mut plan: Arc<dyn ExecutionPlan> = Arc::clone(scan_result.plan());

// Wrap with ProjectionExec if remainder projection is needed
if let Some(ref proj_exprs) = split.remainder {
let scan_df_schema = DFSchema::try_from(plan.schema().as_ref().clone())?;
let unnormalized_proj_exprs: Vec<Expr> =
unnormalize_cols(proj_exprs.iter().cloned());
plan = self.create_projection_exec(
&unnormalized_proj_exprs,
plan,
&scan_df_schema,
session_state,
)?;
}

Ok(plan)
}

/// Creates a ProjectionExec from logical expressions, handling async UDF expressions.
///
/// If the expressions contain async UDFs, wraps them with `AsyncFuncExec`.
fn create_projection_exec(
&self,
exprs: &[Expr],
input: Arc<dyn ExecutionPlan>,
input_dfschema: &DFSchema,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = exprs
.iter()
.map(|e| {
let physical =
self.create_physical_expr(e, input_dfschema, session_state)?;
let name = e.schema_name().to_string();
Ok((physical, name))
})
.collect::<Result<Vec<_>>>()?;

let num_input_columns = input.schema().fields().len();
let input_schema = input.schema();

match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::ExprWithName(physical_exprs),
input_schema.as_ref(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::ExprWithName(physical_exprs),
) => {
let async_exec = AsyncFuncExec::try_new(async_map.async_exprs, input)?;
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::new(async_exec),
)?))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
}
}
}

/// Expand and align a GROUPING SET expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ impl TableProvider for CustomProvider {
filters: &[Expr],
_: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let empty = Vec::new();
let projection = projection.unwrap_or(&empty);
// None means "all columns", Some(empty) means "no columns"
let select_all_columns = projection.is_none() || !projection.unwrap().is_empty();
match &filters[0] {
Expr::BinaryExpr(BinaryExpr { right, .. }) => {
let int_value = match &**right {
Expand Down Expand Up @@ -215,9 +215,10 @@ impl TableProvider for CustomProvider {
};

Ok(Arc::new(CustomPlan::new(
match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
if select_all_columns {
self.zero_batch.schema()
} else {
Arc::new(Schema::empty())
},
match int_value {
0 => vec![self.zero_batch.clone()],
Expand All @@ -227,9 +228,10 @@ impl TableProvider for CustomProvider {
)))
}
_ => Ok(Arc::new(CustomPlan::new(
match projection.is_empty() {
true => Arc::new(Schema::empty()),
false => self.zero_batch.schema(),
if select_all_columns {
self.zero_batch.schema()
} else {
Arc::new(Schema::empty())
},
vec![],
))),
Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,8 @@ impl LogicalPlanBuilder {
{
let sub_plan = p.into_owned();

if let Some(proj) = table_scan.projection {
let projection_exprs = proj
.into_iter()
.map(|i| {
Expr::Column(Column::from(sub_plan.schema().qualified_field(i)))
})
.collect::<Vec<_>>();
if let Some(projection_exprs) = table_scan.projection {
// projection is now Vec<Expr>, use directly
return Self::new(sub_plan)
.project(projection_exprs)?
.alias(table_scan.table_name);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub use plan::{
EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection,
RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
projection_schema,
SubqueryAlias, TableScan, TableScanBuilder, ToStringifiedPlan, Union, Unnest, Values,
Window, projection_schema,
};
pub use statement::{
Deallocate, Execute, Prepare, ResetVariable, SetVariable, Statement,
Expand Down
Loading
Loading