From 2f22ecef60fa3bd0b06f7adb232dc4a01b926e1f Mon Sep 17 00:00:00 2001 From: codedump Date: Sat, 24 Jan 2026 17:15:15 +0800 Subject: [PATCH 1/4] refactor: unify SQL planning for ORDER BY, HAVING, DISTINCT, etc --- datafusion/expr/src/logical_plan/builder.rs | 23 +- datafusion/sql/src/expr/order_by.rs | 298 +++++++++++++++++++- datafusion/sql/src/query.rs | 16 +- datafusion/sql/src/select.rs | 109 +++++-- datafusion/sql/tests/cases/plan_to_sql.rs | 2 +- 5 files changed, 426 insertions(+), 22 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e23fef1da768..a1dffc2563e1a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -800,6 +800,27 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, false) + } + + /// Apply a sort with option to skip adding missing columns + /// + /// This is used by SELECT statements where missing ORDER BY columns are + /// already added by `add_missing_order_by_exprs`. + pub fn sort_with_limit_skip_missing( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, true) + } + + fn sort_with_limit_inner( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + skip_add_missing_columns: bool, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; @@ -820,7 +841,7 @@ impl LogicalPlanBuilder { Ok(()) })?; - if missing_cols.is_empty() { + if missing_cols.is_empty() || skip_add_missing_columns { return Ok(Self::new(LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index faecfbcfecc05..a1d2caced5a54 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -15,12 +15,18 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; + use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ - Column, DFSchema, Result, not_impl_err, plan_datafusion_err, plan_err, + Column, DFSchema, DFSchemaRef, Result, not_impl_err, plan_datafusion_err, plan_err, }; use datafusion_expr::expr::Sort; use datafusion_expr::{Expr, SortExpr}; +use indexmap::IndexSet; use sqlparser::ast::{ Expr as SQLExpr, OrderByExpr, OrderByOptions, Value, ValueWithSpan, }; @@ -117,4 +123,294 @@ impl SqlToRel<'_, S> { Ok(sort_expr_vec) } + + /// Add missing ORDER BY expressions to the SELECT list. + /// + /// This function handles the case where ORDER BY expressions reference columns + /// or expressions that are not present in the SELECT list. Instead of traversing + /// the plan tree to find projection nodes, it directly adds the missing + /// expressions to the SELECT list. + /// + /// # Behavior + /// + /// - For aggregate functions (e.g., `SUM(x)`) and window functions, the original + /// expression is added to the SELECT list, and the ORDER BY expression is + /// replaced with a column reference to that expression's output name. + /// + /// - For column references that don't exist in the current schema, the column + /// reference itself is added to the SELECT list. + /// + /// - If the query uses `SELECT DISTINCT` and there are missing ORDER BY + /// expressions, an error is returned, as this would make the DISTINCT + /// operation ambiguous. + /// + /// - Aliases defined in the SELECT list are recognized and used to replace + /// the corresponding expressions in ORDER BY with column references. + /// + /// - When `strict` is true (e.g., when GROUP BY is present), ORDER BY + /// expressions must already be in the SELECT list, be an alias, or be an + /// aggregate/window function. Missing expressions will cause an error instead + /// of being added to the SELECT list. This preserves the error message + /// "Column in ORDER BY must be in GROUP BY" for invalid queries. + /// + /// # Arguments + /// + /// * `select_exprs` - Mutable reference to the SELECT expressions list. Missing + /// expressions will be added to this list (unless strict is true). + /// * `schema` - The schema of the projected plan, used to check if column + /// references exist. + /// * `distinct` - Whether the query uses `SELECT DISTINCT`. If true, missing + /// ORDER BY expressions will cause an error. + /// * `strict` - Whether to strictly validate ORDER BY expressions. If true, + /// missing expressions will cause an error instead of being added. + /// * `order_by` - Mutable slice of ORDER BY expressions. The expressions will + /// be rewritten to use column references where appropriate. + /// + /// # Returns + /// + /// * `Ok(true)` - If expressions were added to the SELECT list. + /// * `Ok(false)` - If no expressions needed to be added. + /// * `Err(...)` - If there's an error (e.g., DISTINCT with missing ORDER BY + /// expressions). + /// + /// # Example + /// + /// ```text + /// Input: SELECT x FROM foo ORDER BY y + /// + /// Before: select_exprs = [x] + /// order_by = [Sort { expr: Column(y), ... }] + /// + /// After: select_exprs = [x, y] + /// order_by = [Sort { expr: Column(y), ... }] + /// returns Ok(true) + /// ``` + pub(crate) fn add_missing_order_by_exprs( + select_exprs: &mut Vec, + schema: &DFSchemaRef, + distinct: bool, + strict: bool, + order_by: &mut [Sort], + ) -> Result { + add_missing_order_by_exprs_impl(select_exprs, schema, distinct, strict, order_by) + } +} + +/// Internal implementation of add_missing_order_by_exprs for testability. +fn add_missing_order_by_exprs_impl( + select_exprs: &mut Vec, + schema: &DFSchemaRef, + distinct: bool, + strict: bool, + order_by: &mut [Sort], +) -> Result { + let mut missing_exprs: IndexSet = IndexSet::new(); + + let mut aliases = HashMap::new(); + for expr in select_exprs.iter() { + if let Expr::Alias(alias) = expr { + aliases.insert(alias.expr.clone(), alias.name.clone()); + } + } + + let mut rewrite = |expr: Expr| { + if select_exprs.contains(&expr) { + return Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)); + } + if let Some(alias) = aliases.get(&expr) { + return Ok(Transformed::new( + Expr::Column(Column::new_unqualified(alias.clone())), + false, + TreeNodeRecursion::Jump, + )); + } + match expr { + Expr::AggregateFunction(_) | Expr::WindowFunction(_) => { + let replaced = Expr::Column(Column::new_unqualified( + expr.schema_name().to_string(), + )); + missing_exprs.insert(expr); + Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump)) + } + Expr::Column(ref c) => { + if strict { + // In strict mode (e.g., GROUP BY present), the column must exist in schema + // If it doesn't exist and isn't in select_exprs, we'll error later + // Don't add it to missing_exprs to preserve proper error message + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else if !schema.has_column(c) { + missing_exprs.insert(Expr::Column(c.clone())); + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } else { + Ok(Transformed::new(expr, false, TreeNodeRecursion::Jump)) + } + } + _ => Ok(Transformed::no(expr)), + } + }; + for sort in order_by.iter_mut() { + let expr = std::mem::take(&mut sort.expr); + sort.expr = expr.transform_down(&mut rewrite).data()?; + } + if !missing_exprs.is_empty() { + if distinct { + plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", + missing_exprs[0] + ) + } else { + select_exprs.extend(missing_exprs); + Ok(true) + } + } else { + Ok(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::expr::Alias; + + fn create_test_schema() -> DFSchemaRef { + let fields = vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ]; + DFSchemaRef::new( + DFSchema::from_unqualified_fields(fields.into(), HashMap::new()).unwrap(), + ) + } + + #[test] + fn test_add_missing_column_not_in_select() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("d").sort(true, false)]; // d is not in schema + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // d is not in schema, so it should be added + assert!(result.unwrap()); + assert_eq!(select_exprs.len(), 2); + assert!(select_exprs.contains(&col("a"))); + assert!(select_exprs.contains(&col("d"))); + } + + #[test] + fn test_no_missing_column_when_already_in_select() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a"), col("b")]; + let mut order_by = vec![col("b").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + assert!(!result.unwrap()); + assert_eq!(select_exprs.len(), 2); + } + + #[test] + fn test_alias_resolution() { + let schema = create_test_schema(); + // SELECT a AS x, b + let mut select_exprs = vec![ + Expr::Alias(Alias::new(col("a"), None::<&str>, "x")), + col("b"), + ]; + // ORDER BY a (should be resolved to alias x) + let mut order_by = vec![col("a").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // No new expressions should be added (a is resolved to alias x) + assert!(!result.unwrap()); + // ORDER BY a should be replaced with Column(x) reference + assert_eq!(order_by[0].expr, col("x")); + } + + #[test] + fn test_distinct_with_missing_column_error() { + let schema = create_test_schema(); + // SELECT DISTINCT a + // ORDER BY d (d is not in select, not in schema) + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("d").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + true, // distinct = true + false, + &mut order_by, + ); + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!(err_msg.contains("SELECT DISTINCT")); + assert!(err_msg.contains("must appear in select list")); + } + + #[test] + fn test_strict_mode_no_add() { + let schema = create_test_schema(); + let mut select_exprs = vec![col("a")]; + let mut order_by = vec![col("b").sort(true, false)]; + + // strict = true should NOT add missing columns + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + true, // strict = true + &mut order_by, + ); + + assert!(!result.unwrap()); + assert_eq!(select_exprs.len(), 1); // b was not added + } + + #[test] + fn test_column_in_order_by_not_in_select_or_schema() { + let schema = create_test_schema(); + // SELECT a, b + // ORDER BY d - d is not in schema (would come from FROM clause in real scenario) + let mut select_exprs = vec![col("a"), col("b")]; + let mut order_by = vec![col("d").sort(true, false)]; + + let result = add_missing_order_by_exprs_impl( + &mut select_exprs, + &schema, + false, + false, + &mut order_by, + ); + + // d should be added to select_exprs + assert!(result.unwrap()); + assert!(select_exprs.contains(&col("d"))); + } + + fn col(name: &str) -> Expr { + Expr::Column(Column::new_unqualified(name)) + } } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 1b7bb856a592b..aca6238943b4c 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -94,7 +94,9 @@ impl SqlToRel<'_, S> { true, None, )?; - let plan = self.order_by(plan, order_by_rex)?; + // Pass false to skip_add_missing_columns because for non-SELECT set expressions + // (like UNION), we still need to use add_missing_columns in sort_with_limit + let plan = self.order_by(plan, order_by_rex, false)?; self.limit(plan, limit_clause, planner_context) } }?; @@ -134,7 +136,8 @@ impl SqlToRel<'_, S> { true, None, )?; - self.order_by(plan, sort_exprs) + // For pipe operator ORDER BY, use add_missing_columns behavior + self.order_by(plan, sort_exprs, false) } PipeOperator::Limit { expr, offset } => self.limit( plan, @@ -299,10 +302,15 @@ impl SqlToRel<'_, S> { } /// Wrap the logical in a sort + /// + /// If `skip_add_missing_columns` is true, the method will not try to add + /// missing columns to the input plan. This is used by SELECT statements + /// where missing ORDER BY columns are already added by `add_missing_order_by_exprs`. pub(super) fn order_by( &self, plan: LogicalPlan, order_by: Vec, + skip_add_missing_columns: bool, ) -> Result { if order_by.is_empty() { return Ok(plan); @@ -313,6 +321,10 @@ impl SqlToRel<'_, S> { // optimization we're effectively doing a `first_value` aggregation according to them. let distinct_on = distinct_on.clone().with_sort_expr(order_by)?; Ok(LogicalPlan::Distinct(Distinct::On(distinct_on))) + } else if skip_add_missing_columns { + LogicalPlanBuilder::from(plan) + .sort_with_limit_skip_missing(order_by, None)? + .build() } else { LogicalPlanBuilder::from(plan).sort(order_by)?.build() } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 1d6ccde6be13a..6bb77017768c9 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -110,11 +110,11 @@ impl SqlToRel<'_, S> { )?; // Having and group by clause may reference aliases defined in select projection - let projected_plan = self.project(base_plan.clone(), select_exprs)?; - let select_exprs = projected_plan.expressions(); + let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?; + let projected_plan_exprs = projected_plan.expressions(); let order_by = - to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?; + to_order_by_exprs_with_select(query_order_by, Some(&projected_plan_exprs))?; // Place the fields of the base plan at the front so that when there are references // with the same name, the fields of the base plan will be searched first. @@ -131,11 +131,56 @@ impl SqlToRel<'_, S> { true, Some(base_plan.schema().as_ref()), )?; - let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; + let mut order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; + + // Check if there are any Wildcards in select_exprs + let has_wildcard = select_exprs.iter().any(|e| { + matches!( + e, + SelectExpr::Wildcard(_) | SelectExpr::QualifiedWildcard(_, _) + ) + }); + + // Convert SelectExpr to Expr for add_missing_order_by_exprs + // If there's a wildcard, we need to use the expanded expressions from the projected plan + // Otherwise, we filter out non-Expression items + let mut projected_select_exprs: Vec = if has_wildcard { + projected_plan.expressions() + } else { + select_exprs + .iter() + .filter_map(|e| match e { + SelectExpr::Expression(expr) => Some(expr.clone()), + _ => None, + }) + .collect() + }; - // This alias map is resolved and looked up in both having exprs and group by exprs - let alias_map = extract_aliases(&select_exprs); + // Check if we need strict mode: GROUP BY present or aggregates in SELECT/ORDER BY + // Note: GroupByExpr::Expressions(vec![], _) means no GROUP BY, so we need to check + // if the expressions list is non-empty + let has_group_by = match &select.group_by { + GroupByExpr::Expressions(exprs, _) => !exprs.is_empty(), + GroupByExpr::All(_) => true, + }; + let has_aggregates = { + let select_aggrs = find_aggregate_exprs(projected_select_exprs.iter()); + let order_by_aggrs = + find_aggregate_exprs(order_by_rex.iter().map(|s| &s.expr)); + !select_aggrs.is_empty() || !order_by_aggrs.is_empty() + }; + let strict = has_group_by || has_aggregates; + + let added = Self::add_missing_order_by_exprs( + &mut projected_select_exprs, + projected_plan.schema(), + matches!(select.distinct, Some(Distinct::Distinct)), + strict, + &mut order_by_rex, + )?; + // This alias map is resolved and looked up in both having exprs and group by exprs + let alias_map = extract_aliases(&projected_select_exprs); // Optionally the HAVING expression. let having_expr_opt = select .having @@ -181,8 +226,10 @@ impl SqlToRel<'_, S> { } let group_by_expr = resolve_aliases_to_exprs(group_by_expr, &alias_map)?; - let group_by_expr = - resolve_positions_to_exprs(group_by_expr, &select_exprs)?; + let group_by_expr = resolve_positions_to_exprs( + group_by_expr, + &projected_select_exprs, + )?; let group_by_expr = normalize_col(group_by_expr, &projected_plan)?; self.validate_schema_satisfies_exprs( base_plan.schema(), @@ -194,7 +241,7 @@ impl SqlToRel<'_, S> { } else { // 'group by all' groups wrt. all select expressions except 'AggregateFunction's. // Filter and collect non-aggregate select expressions - select_exprs + projected_select_exprs .iter() .filter(|select_expr| match select_expr { Expr::AggregateFunction(_) => false, @@ -237,7 +284,7 @@ impl SqlToRel<'_, S> { // The outer expressions we will search through for aggregates. // First, find aggregates in SELECT, HAVING, and QUALIFY let select_having_qualify_aggrs = find_aggregate_exprs( - select_exprs + projected_select_exprs .iter() .chain(having_expr_opt.iter()) .chain(qualify_expr_opt.iter()), @@ -265,7 +312,7 @@ impl SqlToRel<'_, S> { } = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() { self.aggregate( &base_plan, - &select_exprs, + projected_select_exprs.as_slice(), having_expr_opt.as_ref(), qualify_expr_opt.as_ref(), &order_by_rex, @@ -281,7 +328,7 @@ impl SqlToRel<'_, S> { } None => AggregatePlanResult { plan: base_plan.clone(), - select_exprs: select_exprs.clone(), + select_exprs: projected_select_exprs.clone(), having_expr: having_expr_opt, qualify_expr: qualify_expr_opt, order_by_exprs: order_by_rex, @@ -383,7 +430,7 @@ impl SqlToRel<'_, S> { // Build the final plan LogicalPlanBuilder::from(base_plan) - .distinct_on(on_expr, select_exprs, None)? + .distinct_on(on_expr, projected_select_exprs.clone(), None)? .build() } }?; @@ -408,8 +455,26 @@ impl SqlToRel<'_, S> { plan }; - let plan = self.order_by(plan, order_by_rex)?; - Ok(plan) + // For non-aggregate queries (no GROUP BY and no aggregates), we can skip + // add_missing_columns because add_missing_order_by_exprs has already added + // the missing columns. For aggregate queries, we still need add_missing_columns + // to handle complex cases like ORDER BY count(*). + let skip_add_missing = !strict; + let plan = self.order_by(plan, order_by_rex, skip_add_missing)?; + // if add missing columns, we MUST remove unused columns in project + if added { + LogicalPlanBuilder::from(plan) + .project( + projected_plan + .schema() + .columns() + .into_iter() + .map(Expr::Column), + )? + .build() + } else { + Ok(plan) + } } /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection @@ -428,8 +493,13 @@ impl SqlToRel<'_, S> { .iter() .any(has_unnest_expr_recursively) { + // Explicitly convert Expr to SelectExpr to ensure all expressions are properly included + let select_exprs: Vec = intermediate_select_exprs + .into_iter() + .map(SelectExpr::Expression) + .collect(); return LogicalPlanBuilder::from(intermediate_plan) - .project(intermediate_select_exprs)? + .project(select_exprs)? .build(); } @@ -459,7 +529,12 @@ impl SqlToRel<'_, S> { // The original expr does not contain any unnest if i == 0 { return LogicalPlanBuilder::from(intermediate_plan) - .project(intermediate_select_exprs)? + .project( + intermediate_select_exprs + .iter() + .cloned() + .map(SelectExpr::Expression), + )? .build(); } break; diff --git a/datafusion/sql/tests/cases/plan_to_sql.rs b/datafusion/sql/tests/cases/plan_to_sql.rs index 4717b843abb53..042a3dd51ae00 100644 --- a/datafusion/sql/tests/cases/plan_to_sql.rs +++ b/datafusion/sql/tests/cases/plan_to_sql.rs @@ -1984,7 +1984,7 @@ fn test_complex_order_by_with_grouping() -> Result<()> { }, { assert_snapshot!( sql, - @r#"SELECT j1.j1_id, j1.j1_string, lochierarchy FROM (SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy, grouping(j1.j1_string), grouping(j1.j1_id) FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string)) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (("grouping(j1.j1_id)" + "grouping(j1.j1_string)") = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# + @r#"SELECT j1.j1_id, j1.j1_string, (grouping(j1.j1_id) + grouping(j1.j1_string)) AS lochierarchy FROM j1 GROUP BY ROLLUP (j1.j1_id, j1.j1_string) ORDER BY lochierarchy DESC NULLS FIRST, CASE WHEN (lochierarchy = 0) THEN j1.j1_id END ASC NULLS LAST LIMIT 100"# ); }); From 9aa2f9b94ea36b035c4999adbcf2b4bd8b189b7d Mon Sep 17 00:00:00 2001 From: codedump Date: Fri, 30 Jan 2026 23:13:04 +0800 Subject: [PATCH 2/4] refactor: unify SQL planning for ORDER BY, HAVING, DISTINCT, etc --- datafusion/expr/src/logical_plan/builder.rs | 23 +-------------------- datafusion/sql/src/expr/function.rs | 18 +++------------- datafusion/sql/src/expr/order_by.rs | 20 +++--------------- datafusion/sql/src/query.rs | 18 ++-------------- datafusion/sql/src/select.rs | 8 +------ datafusion/sql/src/statement.rs | 5 ++--- 6 files changed, 12 insertions(+), 80 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a1dffc2563e1a..2e23fef1da768 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -800,27 +800,6 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, - ) -> Result { - self.sort_with_limit_inner(sorts, fetch, false) - } - - /// Apply a sort with option to skip adding missing columns - /// - /// This is used by SELECT statements where missing ORDER BY columns are - /// already added by `add_missing_order_by_exprs`. - pub fn sort_with_limit_skip_missing( - self, - sorts: impl IntoIterator> + Clone, - fetch: Option, - ) -> Result { - self.sort_with_limit_inner(sorts, fetch, true) - } - - fn sort_with_limit_inner( - self, - sorts: impl IntoIterator> + Clone, - fetch: Option, - skip_add_missing_columns: bool, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; @@ -841,7 +820,7 @@ impl LogicalPlanBuilder { Ok(()) })?; - if missing_cols.is_empty() || skip_add_missing_columns { + if missing_cols.is_empty() { return Ok(Self::new(LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 641f3bb8dcad1..a76ed1a05763a 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -395,7 +395,6 @@ impl SqlToRel<'_, S> { planner_context, // Numeric literals in window function ORDER BY are treated as constants false, - None, )?; let func_deps = schema.functional_dependencies(); @@ -568,13 +567,7 @@ impl SqlToRel<'_, S> { } else { within_group }; - self.order_by_to_sort_expr( - order_by, - schema, - planner_context, - true, - None, - )? + self.order_by_to_sort_expr(order_by, schema, planner_context, true)? }; let filter: Option> = filter @@ -875,13 +868,8 @@ impl SqlToRel<'_, S> { schema: &DFSchema, planner_context: &mut PlannerContext, ) -> Result { - let within_group = self.order_by_to_sort_expr( - within_group, - schema, - planner_context, - false, - None, - )?; + let within_group = + self.order_by_to_sort_expr(within_group, schema, planner_context, false)?; if !within_group.is_empty() { let within_group_count = within_group.len(); diff --git a/datafusion/sql/src/expr/order_by.rs b/datafusion/sql/src/expr/order_by.rs index a1d2caced5a54..9a6e30d67c7bf 100644 --- a/datafusion/sql/src/expr/order_by.rs +++ b/datafusion/sql/src/expr/order_by.rs @@ -51,22 +51,11 @@ impl SqlToRel<'_, S> { input_schema: &DFSchema, planner_context: &mut PlannerContext, literal_to_column: bool, - additional_schema: Option<&DFSchema>, ) -> Result> { if order_by_exprs.is_empty() { return Ok(vec![]); } - let mut combined_schema; - let order_by_schema = match additional_schema { - Some(schema) => { - combined_schema = input_schema.clone(); - combined_schema.merge(schema); - &combined_schema - } - None => input_schema, - }; - let mut sort_expr_vec = Vec::with_capacity(order_by_exprs.len()); let make_sort_expr = |expr: Expr, @@ -114,9 +103,7 @@ impl SqlToRel<'_, S> { input_schema.qualified_field(field_index - 1), )) } - e => { - self.sql_expr_to_logical_expr(e, order_by_schema, planner_context)? - } + e => self.sql_expr_to_logical_expr(e, input_schema, planner_context)?, }; sort_expr_vec.push(make_sort_expr(expr, asc, nulls_first)); } @@ -226,9 +213,8 @@ fn add_missing_order_by_exprs_impl( } match expr { Expr::AggregateFunction(_) | Expr::WindowFunction(_) => { - let replaced = Expr::Column(Column::new_unqualified( - expr.schema_name().to_string(), - )); + let replaced = + Expr::Column(Column::new_unqualified(expr.schema_name().to_string())); missing_exprs.insert(expr); Ok(Transformed::new(replaced, true, TreeNodeRecursion::Jump)) } diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index aca6238943b4c..7d38a3e8b7f55 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -92,11 +92,8 @@ impl SqlToRel<'_, S> { plan.schema(), planner_context, true, - None, )?; - // Pass false to skip_add_missing_columns because for non-SELECT set expressions - // (like UNION), we still need to use add_missing_columns in sort_with_limit - let plan = self.order_by(plan, order_by_rex, false)?; + let plan = self.order_by(plan, order_by_rex)?; self.limit(plan, limit_clause, planner_context) } }?; @@ -134,10 +131,8 @@ impl SqlToRel<'_, S> { plan.schema(), planner_context, true, - None, )?; - // For pipe operator ORDER BY, use add_missing_columns behavior - self.order_by(plan, sort_exprs, false) + self.order_by(plan, sort_exprs) } PipeOperator::Limit { expr, offset } => self.limit( plan, @@ -302,15 +297,10 @@ impl SqlToRel<'_, S> { } /// Wrap the logical in a sort - /// - /// If `skip_add_missing_columns` is true, the method will not try to add - /// missing columns to the input plan. This is used by SELECT statements - /// where missing ORDER BY columns are already added by `add_missing_order_by_exprs`. pub(super) fn order_by( &self, plan: LogicalPlan, order_by: Vec, - skip_add_missing_columns: bool, ) -> Result { if order_by.is_empty() { return Ok(plan); @@ -321,10 +311,6 @@ impl SqlToRel<'_, S> { // optimization we're effectively doing a `first_value` aggregation according to them. let distinct_on = distinct_on.clone().with_sort_expr(order_by)?; Ok(LogicalPlan::Distinct(Distinct::On(distinct_on))) - } else if skip_add_missing_columns { - LogicalPlanBuilder::from(plan) - .sort_with_limit_skip_missing(order_by, None)? - .build() } else { LogicalPlanBuilder::from(plan).sort(order_by)?.build() } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 6bb77017768c9..f8968f17c890f 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -129,7 +129,6 @@ impl SqlToRel<'_, S> { projected_plan.schema().as_ref(), planner_context, true, - Some(base_plan.schema().as_ref()), )?; let mut order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?; @@ -455,12 +454,7 @@ impl SqlToRel<'_, S> { plan }; - // For non-aggregate queries (no GROUP BY and no aggregates), we can skip - // add_missing_columns because add_missing_order_by_exprs has already added - // the missing columns. For aggregate queries, we still need add_missing_columns - // to handle complex cases like ORDER BY count(*). - let skip_add_missing = !strict; - let plan = self.order_by(plan, order_by_rex, skip_add_missing)?; + let plan = self.order_by(plan, order_by_rex)?; // if add missing columns, we MUST remove unused columns in project if added { LogicalPlanBuilder::from(plan) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 4981db5537a74..2ace303e33e60 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1322,7 +1322,7 @@ impl SqlToRel<'_, S> { let function_body = match function_body { Some(r) => Some(self.sql_to_expr( match r { - // `link_symbol` indicates if the primary expression contains the name of shared library file. + // `link_symbol` indicates if the primary expression contains the name of shared library file. ast::CreateFunctionBody::AsBeforeOptions{body: expr, link_symbol: _link_symbol} => expr, ast::CreateFunctionBody::AsAfterOptions(expr) => expr, ast::CreateFunctionBody::Return(expr) => expr, @@ -1468,7 +1468,6 @@ impl SqlToRel<'_, S> { &table_schema, planner_context, false, - None, )?; Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex( PlanCreateIndex { @@ -1661,7 +1660,7 @@ impl SqlToRel<'_, S> { for expr in order_exprs { // Convert each OrderByExpr to a SortExpr: let expr_vec = - self.order_by_to_sort_expr(expr, schema, planner_context, true, None)?; + self.order_by_to_sort_expr(expr, schema, planner_context, true)?; // Verify that columns of all SortExprs exist in the schema: for sort in expr_vec.iter() { for column in sort.expr.column_refs().iter() { From 305612903e97fbfa610e12d94c2cf1a56d61a6e5 Mon Sep 17 00:00:00 2001 From: codedump Date: Fri, 30 Jan 2026 23:41:02 +0800 Subject: [PATCH 3/4] refactor: unify SQL planning for ORDER BY, HAVING, DISTINCT, etc --- datafusion/expr/src/logical_plan/builder.rs | 125 +------------------- 1 file changed, 3 insertions(+), 122 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e23fef1da768..3a6c5c2f16588 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -25,7 +25,7 @@ use std::iter::once; use std::sync::Arc; use crate::dml::CopyTo; -use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr}; +use crate::expr::{PlannedReplaceSelectItem, Sort as SortExpr}; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts, @@ -664,118 +664,6 @@ impl LogicalPlanBuilder { subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new) } - /// Add missing sort columns to all downstream projection - /// - /// Thus, if you have a LogicalPlan that selects A and B and have - /// not requested a sort by C, this code will add C recursively to - /// all input projections. - /// - /// Adding a new column is not correct if there is a `Distinct` - /// node, which produces only distinct values of its - /// inputs. Adding a new column to its input will result in - /// potentially different results than with the original column. - /// - /// For example, if the input is like: - /// - /// Distinct(A, B) - /// - /// If the input looks like - /// - /// a | b | c - /// --+---+--- - /// 1 | 2 | 3 - /// 1 | 2 | 4 - /// - /// Distinct (A, B) --> (1,2) - /// - /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4) - /// (which will appear as a (1, 2), (1, 2) if a and b are projected - /// - /// See for more details - fn add_missing_columns( - curr_plan: LogicalPlan, - missing_cols: &IndexSet, - is_distinct: bool, - ) -> Result { - match curr_plan { - LogicalPlan::Projection(Projection { - input, - mut expr, - schema: _, - }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => { - let mut missing_exprs = missing_cols - .iter() - .map(|c| normalize_col(Expr::Column(c.clone()), &input)) - .collect::>>()?; - - // Do not let duplicate columns to be added, some of the - // missing_cols may be already present but without the new - // projected alias. - missing_exprs.retain(|e| !expr.contains(e)); - if is_distinct { - Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?; - } - expr.extend(missing_exprs); - project(Arc::unwrap_or_clone(input), expr) - } - _ => { - let is_distinct = - is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_)); - let new_inputs = curr_plan - .inputs() - .into_iter() - .map(|input_plan| { - Self::add_missing_columns( - (*input_plan).clone(), - missing_cols, - is_distinct, - ) - }) - .collect::>>()?; - curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs) - } - } - } - - fn ambiguous_distinct_check( - missing_exprs: &[Expr], - missing_cols: &IndexSet, - projection_exprs: &[Expr], - ) -> Result<()> { - if missing_exprs.is_empty() { - return Ok(()); - } - - // if the missing columns are all only aliases for things in - // the existing select list, it is ok - // - // This handles the special case for - // SELECT col as ORDER BY - // - // As described in https://github.com/apache/datafusion/issues/5293 - let all_aliases = missing_exprs.iter().all(|e| { - projection_exprs.iter().any(|proj_expr| { - if let Expr::Alias(Alias { expr, .. }) = proj_expr { - e == expr.as_ref() - } else { - false - } - }) - }); - if all_aliases { - return Ok(()); - } - - let missing_col_names = missing_cols - .iter() - .map(|col| col.flat_name()) - .collect::(); - - plan_err!( - "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list" - ) - } - /// Apply a sort by provided expressions with default direction pub fn sort_by( self, @@ -831,16 +719,9 @@ impl LogicalPlanBuilder { // remove pushed down sort columns let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); - let is_distinct = false; - let plan = Self::add_missing_columns( - Arc::unwrap_or_clone(self.plan), - &missing_cols, - is_distinct, - )?; - let sort_plan = LogicalPlan::Sort(Sort { - expr: normalize_sorts(sorts, &plan)?, - input: Arc::new(plan), + expr: normalize_sorts(sorts, &self.plan)?, + input: self.plan, fetch, }); From 15020d69bcbbff987010b1338607b7e190f6ebcc Mon Sep 17 00:00:00 2001 From: codedump Date: Sat, 31 Jan 2026 02:32:42 +0800 Subject: [PATCH 4/4] refactor: unify SQL planning for ORDER BY, HAVING, DISTINCT, etc --- datafusion/expr/src/logical_plan/builder.rs | 160 +++++++++++++++++++- 1 file changed, 155 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 3a6c5c2f16588..93ed106fe724b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -25,7 +25,7 @@ use std::iter::once; use std::sync::Arc; use crate::dml::CopyTo; -use crate::expr::{PlannedReplaceSelectItem, Sort as SortExpr}; +use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr}; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts, @@ -664,6 +664,118 @@ impl LogicalPlanBuilder { subquery_alias(Arc::unwrap_or_clone(self.plan), alias).map(Self::new) } + /// Add missing sort columns to all downstream projection + /// + /// Thus, if you have a LogicalPlan that selects A and B and have + /// not requested a sort by C, this code will add C recursively to + /// all input projections. + /// + /// Adding a new column is not correct if there is a `Distinct` + /// node, which produces only distinct values of its + /// inputs. Adding a new column to its input will result in + /// potentially different results than with the original column. + /// + /// For example, if the input is like: + /// + /// Distinct(A, B) + /// + /// If the input looks like + /// + /// a | b | c + /// --+---+--- + /// 1 | 2 | 3 + /// 1 | 2 | 4 + /// + /// Distinct (A, B) --> (1,2) + /// + /// But Distinct (A, B, C) --> (1, 2, 3), (1, 2, 4) + /// (which will appear as a (1, 2), (1, 2) if a and b are projected + /// + /// See for more details + fn add_missing_columns( + curr_plan: LogicalPlan, + missing_cols: &IndexSet, + is_distinct: bool, + ) -> Result { + match curr_plan { + LogicalPlan::Projection(Projection { + input, + mut expr, + schema: _, + }) if missing_cols.iter().all(|c| input.schema().has_column(c)) => { + let mut missing_exprs = missing_cols + .iter() + .map(|c| normalize_col(Expr::Column(c.clone()), &input)) + .collect::>>()?; + + // Do not let duplicate columns to be added, some of the + // missing_cols may be already present but without the new + // projected alias. + missing_exprs.retain(|e| !expr.contains(e)); + if is_distinct { + Self::ambiguous_distinct_check(&missing_exprs, missing_cols, &expr)?; + } + expr.extend(missing_exprs); + project(Arc::unwrap_or_clone(input), expr) + } + _ => { + let is_distinct = + is_distinct || matches!(curr_plan, LogicalPlan::Distinct(_)); + let new_inputs = curr_plan + .inputs() + .into_iter() + .map(|input_plan| { + Self::add_missing_columns( + (*input_plan).clone(), + missing_cols, + is_distinct, + ) + }) + .collect::>>()?; + curr_plan.with_new_exprs(curr_plan.expressions(), new_inputs) + } + } + } + + fn ambiguous_distinct_check( + missing_exprs: &[Expr], + missing_cols: &IndexSet, + projection_exprs: &[Expr], + ) -> Result<()> { + if missing_exprs.is_empty() { + return Ok(()); + } + + // if the missing columns are all only aliases for things in + // the existing select list, it is ok + // + // This handles the special case for + // SELECT col as ORDER BY + // + // As described in https://github.com/apache/datafusion/issues/5293 + let all_aliases = missing_exprs.iter().all(|e| { + projection_exprs.iter().any(|proj_expr| { + if let Expr::Alias(Alias { expr, .. }) = proj_expr { + e == expr.as_ref() + } else { + false + } + }) + }); + if all_aliases { + return Ok(()); + } + + let missing_col_names = missing_cols + .iter() + .map(|col| col.flat_name()) + .collect::(); + + plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {missing_col_names} must appear in select list" + ) + } + /// Apply a sort by provided expressions with default direction pub fn sort_by( self, @@ -688,10 +800,31 @@ impl LogicalPlanBuilder { self, sorts: impl IntoIterator> + Clone, fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, false) + } + + /// Apply a sort with option to skip adding missing columns + /// + /// This is used by SELECT statements where missing ORDER BY columns are + /// already added by `add_missing_order_by_exprs`. + pub fn sort_with_limit_skip_missing( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + ) -> Result { + self.sort_with_limit_inner(sorts, fetch, true) + } + + fn sort_with_limit_inner( + self, + sorts: impl IntoIterator> + Clone, + fetch: Option, + skip_add_missing_columns: bool, ) -> Result { let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?; - let schema = self.plan.schema(); + let schema = Arc::clone(self.plan.schema()); // Collect sort columns that are missing in the input plan's schema let mut missing_cols: IndexSet = IndexSet::new(); @@ -708,7 +841,16 @@ impl LogicalPlanBuilder { Ok(()) })?; - if missing_cols.is_empty() { + // For DISTINCT queries, ORDER BY expressions must appear in the select list + // This check ensures consistent behavior between SQL and DataFrame API paths + if !missing_cols.is_empty() && matches!(&*self.plan, LogicalPlan::Distinct(_)) { + return plan_err!( + "For SELECT DISTINCT, ORDER BY expressions {} must appear in select list", + missing_cols.iter().next().unwrap() + ); + } + + if missing_cols.is_empty() || skip_add_missing_columns { return Ok(Self::new(LogicalPlan::Sort(Sort { expr: normalize_sorts(sorts, &self.plan)?, input: self.plan, @@ -716,12 +858,20 @@ impl LogicalPlanBuilder { }))); } + // Add missing columns to downstream projection + let is_distinct = false; + let plan = Self::add_missing_columns( + Arc::unwrap_or_clone(self.plan), + &missing_cols, + is_distinct, + )?; + // remove pushed down sort columns let new_expr = schema.columns().into_iter().map(Expr::Column).collect(); let sort_plan = LogicalPlan::Sort(Sort { - expr: normalize_sorts(sorts, &self.plan)?, - input: self.plan, + expr: normalize_sorts(sorts, &plan)?, + input: Arc::new(plan), fetch, });