Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,31 @@ impl LogicalPlanBuilder {
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
) -> Result<Self> {
self.sort_with_limit_inner(sorts, fetch, false)
}

/// Apply a sort with option to skip adding missing columns
///
/// This is used by SELECT statements where missing ORDER BY columns are
/// already added by `add_missing_order_by_exprs`.
pub fn sort_with_limit_skip_missing(
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
) -> Result<Self> {
self.sort_with_limit_inner(sorts, fetch, true)
}

fn sort_with_limit_inner(
self,
sorts: impl IntoIterator<Item = impl Into<SortExpr>> + Clone,
fetch: Option<usize>,
skip_add_missing_columns: bool,
) -> Result<Self> {
let sorts = rewrite_sort_cols_by_aggs(sorts, &self.plan)?;

let schema = self.plan.schema();
let schema = Arc::clone(self.plan.schema());

// Collect sort columns that are missing in the input plan's schema
let mut missing_cols: IndexSet<Column> = IndexSet::new();
Expand All @@ -820,24 +841,34 @@ impl LogicalPlanBuilder {
Ok(())
})?;

if missing_cols.is_empty() {
// For DISTINCT queries, ORDER BY expressions must appear in the select list
// This check ensures consistent behavior between SQL and DataFrame API paths
if !missing_cols.is_empty() && matches!(&*self.plan, LogicalPlan::Distinct(_)) {
return plan_err!(
"For SELECT DISTINCT, ORDER BY expressions {} must appear in select list",
missing_cols.iter().next().unwrap()
);
}

if missing_cols.is_empty() || skip_add_missing_columns {
return Ok(Self::new(LogicalPlan::Sort(Sort {
expr: normalize_sorts(sorts, &self.plan)?,
input: self.plan,
fetch,
})));
}

// remove pushed down sort columns
let new_expr = schema.columns().into_iter().map(Expr::Column).collect();

// Add missing columns to downstream projection
let is_distinct = false;
let plan = Self::add_missing_columns(
Arc::unwrap_or_clone(self.plan),
&missing_cols,
is_distinct,
)?;

// remove pushed down sort columns
let new_expr = schema.columns().into_iter().map(Expr::Column).collect();

let sort_plan = LogicalPlan::Sort(Sort {
expr: normalize_sorts(sorts, &plan)?,
input: Arc::new(plan),
Expand Down
18 changes: 3 additions & 15 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
planner_context,
// Numeric literals in window function ORDER BY are treated as constants
false,
None,
)?;

let func_deps = schema.functional_dependencies();
Expand Down Expand Up @@ -568,13 +567,7 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
} else {
within_group
};
self.order_by_to_sort_expr(
order_by,
schema,
planner_context,
true,
None,
)?
self.order_by_to_sort_expr(order_by, schema, planner_context, true)?
};

let filter: Option<Box<Expr>> = filter
Expand Down Expand Up @@ -875,13 +868,8 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<WithinGroupExtraction> {
let within_group = self.order_by_to_sort_expr(
within_group,
schema,
planner_context,
false,
None,
)?;
let within_group =
self.order_by_to_sort_expr(within_group, schema, planner_context, false)?;

if !within_group.is_empty() {
let within_group_count = within_group.len();
Expand Down
Loading
Loading