diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7d32f2a88fd9c..b30a9ceaeee6e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1268,6 +1268,11 @@ config_namespace! { /// operator's built-in `partition_statistics`. pub use_statistics_registry: bool, default = false + /// When set to true, the physical plan optimizer will attempt to push + /// partial aggregations below joins when this reduces the number of + /// rows flowing into the join (eager aggregation). + pub eager_aggregation: bool, default = true + /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory pub prefer_hash_join: bool, default = true diff --git a/datafusion/core/src/optimizer_rule_reference.md b/datafusion/core/src/optimizer_rule_reference.md index fcbb200c71624..819455f37210d 100644 --- a/datafusion/core/src/optimizer_rule_reference.md +++ b/datafusion/core/src/optimizer_rule_reference.md @@ -72,22 +72,23 @@ in multiple phases. | 1 | `OutputRequirements` | add phase | Adds helper nodes so output requirements survive later physical rewrites. | | 2 | `aggregate_statistics` | - | Uses exact source statistics to answer some aggregates without scanning data. | | 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. | -| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. | -| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. | -| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. | -| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. | -| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. | -| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. | -| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. | -| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. | -| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. | -| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. | -| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. | -| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. | -| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. | -| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. | -| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. | -| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. | -| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. | -| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. | -| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. | +| 4 | `EagerAggregation` | - | Pushes partial aggregations below joins when pre-aggregating by the join key reduces row count. | +| 5 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. | +| 6 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. | +| 7 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. | +| 8 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. | +| 9 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. | +| 10 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. | +| 11 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. | +| 12 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. | +| 13 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. | +| 14 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. | +| 15 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. | +| 16 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. | +| 17 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. | +| 18 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. | +| 19 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. | +| 20 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. | +| 21 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. | +| 22 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. | +| 23 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. | diff --git a/datafusion/physical-optimizer/src/eager_aggregation.rs b/datafusion/physical-optimizer/src/eager_aggregation.rs new file mode 100644 index 0000000000000..d977acb3572e5 --- /dev/null +++ b/datafusion/physical-optimizer/src/eager_aggregation.rs @@ -0,0 +1,704 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`EagerAggregation`] pushes partial aggregations below joins when beneficial. +//! +//! This implements the "Eager Aggregation" optimization from Yan & Larson (VLDB 1995). +//! When a query aggregates columns from one side of a join (the "fact" side), +//! pre-aggregating by the join key before the join can dramatically reduce the +//! number of rows flowing into the join. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use crate::optimizer::{ConfigOnlyContext, PhysicalOptimizerContext}; + +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{JoinType, NullEquality, Result}; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion_physical_plan::joins::HashJoinExec; +use datafusion_physical_plan::operator_statistics::StatisticsRegistry; + +const MIN_REDUCTION_RATIO: f64 = 4.0; +const MIN_DIMENSION_ROWS: usize = 4; + +#[derive(Default, Debug)] +pub struct EagerAggregation; + +impl EagerAggregation { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for EagerAggregation { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + self.optimize_with_context(plan, &ConfigOnlyContext::new(config)) + } + + fn optimize_with_context( + &self, + plan: Arc, + context: &dyn PhysicalOptimizerContext, + ) -> Result> { + let config = context.config_options(); + if !config.optimizer.eager_aggregation { + return Ok(plan); + } + + let mut default_registry = None; + let registry: Option<&StatisticsRegistry> = + if config.optimizer.use_statistics_registry { + Some(context.statistics_registry().unwrap_or_else(|| { + default_registry + .insert(StatisticsRegistry::default_with_builtin_providers()) + })) + } else { + None + }; + + plan.transform_down(|node| try_eager_aggregation(node, registry)) + .data() + } + + fn name(&self) -> &str { + "EagerAggregation" + } + + fn schema_check(&self) -> bool { + true + } +} + +/// Determines which side of the join the aggregate columns reference. +#[derive(Debug, Clone, Copy, PartialEq)] +enum AggregateSide { + Left, + Right, +} + +/// Attempt to apply eager aggregation on a plan node. +fn try_eager_aggregation( + plan: Arc, + registry: Option<&StatisticsRegistry>, +) -> Result>> { + let Some(agg_exec) = plan.downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + // Only apply to Single or SinglePartitioned mode (pre-distribution enforcement) + match agg_exec.mode() { + AggregateMode::Single | AggregateMode::SinglePartitioned => {} + _ => return Ok(Transformed::no(plan)), + } + + let Some(hash_join) = agg_exec.input().downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + // Only inner joins (avoids COUNT bug with outer joins) + if *hash_join.join_type() != JoinType::Inner { + return Ok(Transformed::no(plan)); + } + + // No residual filter (equi-join only) + if hash_join.filter().is_some() { + return Ok(Transformed::no(plan)); + } + + // No GROUPING SETS/ROLLUP/CUBE + if agg_exec.group_expr().has_grouping_set() { + return Ok(Transformed::no(plan)); + } + + // Must have at least one aggregate function + if agg_exec.aggr_expr().is_empty() { + return Ok(Transformed::no(plan)); + } + + // Validate all aggregates are eligible (no DISTINCT, no FILTER, no ORDER BY) + for aggr in agg_exec.aggr_expr().iter() { + if aggr.is_distinct() { + return Ok(Transformed::no(plan)); + } + if !aggr.order_bys().is_empty() { + return Ok(Transformed::no(plan)); + } + } + for filter in agg_exec.filter_expr().iter() { + if filter.is_some() { + return Ok(Transformed::no(plan)); + } + } + + // Determine the schema boundary between left and right join sides + let left_schema = hash_join.left().schema(); + let left_col_count = left_schema.fields().len(); + + // Determine which side all aggregate input columns reference + let agg_side = match determine_aggregate_side(agg_exec, left_col_count)? { + Some(side) => side, + None => return Ok(Transformed::no(plan)), + }; + + // Verify join keys on the fact side are simple Column expressions + let join_on = hash_join.on(); + for (left_key, right_key) in join_on.iter() { + let fact_key = match agg_side { + AggregateSide::Left => left_key, + AggregateSide::Right => right_key, + }; + if fact_key.downcast_ref::().is_none() { + return Ok(Transformed::no(plan)); + } + } + + // Cost-based decision + if !passes_cost_heuristic(hash_join, agg_side, registry)? { + return Ok(Transformed::no(plan)); + } + + // All checks pass — construct the eager aggregation plan + construct_eager_plan(agg_exec, hash_join, agg_side, left_col_count) +} + +/// Determine which side of the join all aggregate input columns reference. +/// Returns None if columns reference both sides (cannot push down). +fn determine_aggregate_side( + agg_exec: &AggregateExec, + left_col_count: usize, +) -> Result> { + let mut side: Option = None; + + // Check aggregate function input columns + for aggr in agg_exec.aggr_expr().iter() { + for expr in aggr.expressions() { + if let Some(col) = expr.downcast_ref::() { + let col_side = if col.index() < left_col_count { + AggregateSide::Left + } else { + AggregateSide::Right + }; + match side { + None => side = Some(col_side), + Some(s) if s != col_side => return Ok(None), + _ => {} + } + } + } + } + + // Check group-by columns: they can reference either side, but we need + // at least the aggregate inputs to be on one side + // Group-by cols from the dimension side are fine (they pass through the join) + // Group-by cols from the fact side get included in the lower aggregate + + // If no aggregate references any column (e.g., COUNT(*)), default to the + // side with more rows — we'll use the left side as default + if side.is_none() { + // For COUNT(*), pick the larger side. We need to check group-by columns + // to see which side they reference. If group-by cols are from one side, + // push the aggregate to the OTHER side (the fact side). + for (expr, _) in agg_exec.group_expr().expr() { + if let Some(col) = expr.downcast_ref::() { + let col_side = if col.index() < left_col_count { + AggregateSide::Left + } else { + AggregateSide::Right + }; + // If group-by cols are on left, fact is right and vice versa + if side.is_none() { + side = Some(match col_side { + AggregateSide::Left => AggregateSide::Right, + AggregateSide::Right => AggregateSide::Left, + }); + } + } + } + } + + Ok(side) +} + +/// Check cost heuristic: is the reduction ratio high enough to justify pre-aggregation? +fn passes_cost_heuristic( + hash_join: &HashJoinExec, + agg_side: AggregateSide, + registry: Option<&StatisticsRegistry>, +) -> Result { + let (fact_plan, dim_plan): (&dyn ExecutionPlan, &dyn ExecutionPlan) = match agg_side { + AggregateSide::Left => (hash_join.left().as_ref(), hash_join.right().as_ref()), + AggregateSide::Right => (hash_join.right().as_ref(), hash_join.left().as_ref()), + }; + + let fact_stats = get_stats(fact_plan, registry)?; + let dim_stats = get_stats(dim_plan, registry)?; + + let fact_rows = match fact_stats.num_rows.get_value() { + Some(&rows) if rows > 0 => rows, + _ => return Ok(false), // Can't decide without stats + }; + + let dim_rows = match dim_stats.num_rows.get_value() { + Some(&rows) if rows > 0 => rows, + _ => return Ok(false), + }; + + if dim_rows < MIN_DIMENSION_ROWS { + return Ok(false); + } + + // Try NDV-based ratio first (more accurate) + let join_on = hash_join.on(); + let fact_key_idx = match agg_side { + AggregateSide::Left => join_on + .first() + .and_then(|(k, _)| k.downcast_ref::()) + .map(|c| c.index()), + AggregateSide::Right => join_on + .first() + .and_then(|(_, k)| k.downcast_ref::()) + .map(|c| c.index()), + }; + + let ndv = fact_key_idx.and_then(|idx| { + fact_stats + .column_statistics + .get(idx) + .and_then(|cs| cs.distinct_count.get_value().copied()) + }); + + let ratio = if let Some(ndv) = ndv { + if ndv == 0 { + return Ok(false); + } + fact_rows as f64 / ndv as f64 + } else { + // Fallback: use dimension row count as proxy for NDV + fact_rows as f64 / dim_rows as f64 + }; + + Ok(ratio >= MIN_REDUCTION_RATIO) +} + +fn get_stats( + plan: &dyn ExecutionPlan, + registry: Option<&StatisticsRegistry>, +) -> Result> { + if let Some(reg) = registry { + reg.compute(plan).map(|s| Arc::clone(s.base_arc())) + } else { + plan.partition_statistics(None) + } +} + +/// Construct the eager aggregation plan: +/// Original: AggregateExec(mode=Single) -> HashJoinExec(left, right) +/// New: AggregateExec(mode=Final) -> HashJoinExec(lower_agg, dim) -> AggregateExec(mode=Partial, fact) +fn construct_eager_plan( + agg_exec: &AggregateExec, + hash_join: &HashJoinExec, + agg_side: AggregateSide, + left_col_count: usize, +) -> Result>> { + let join_on = hash_join.on(); + let fact_child = match agg_side { + AggregateSide::Left => Arc::clone(hash_join.left()), + AggregateSide::Right => Arc::clone(hash_join.right()), + }; + let dim_child = match agg_side { + AggregateSide::Left => Arc::clone(hash_join.right()), + AggregateSide::Right => Arc::clone(hash_join.left()), + }; + + let fact_schema = fact_child.schema(); + let fact_col_offset = match agg_side { + AggregateSide::Left => 0usize, + AggregateSide::Right => left_col_count, + }; + + // Build group-by keys for the lower aggregate: + // 1. Join key columns from the fact side + // 2. Any original GROUP BY columns from the fact side + let mut lower_group_exprs: Vec<(Arc, String)> = Vec::new(); + let mut fact_join_key_indices: Vec = Vec::new(); + + for (left_key, right_key) in join_on.iter() { + let fact_key = match agg_side { + AggregateSide::Left => left_key, + AggregateSide::Right => right_key, + }; + if let Some(col) = fact_key.downcast_ref::() { + let local_idx = col.index() - fact_col_offset; + let field_name = fact_schema.field(local_idx).name().clone(); + lower_group_exprs + .push((Arc::new(Column::new(&field_name, local_idx)), field_name)); + fact_join_key_indices.push(local_idx); + } else { + return Ok(Transformed::no( + Arc::new(agg_exec.clone()) as Arc + )); + } + } + + // Add original GROUP BY columns from fact side (not already included as join keys) + let mut fact_group_col_positions: Vec<(usize, usize)> = Vec::new(); // (original_join_output_idx, lower_group_position) + for (expr, alias) in agg_exec.group_expr().expr() { + if let Some(col) = expr.downcast_ref::() { + let idx = col.index(); + let is_fact_side = match agg_side { + AggregateSide::Left => idx < left_col_count, + AggregateSide::Right => idx >= left_col_count, + }; + if is_fact_side { + let local_idx = idx - fact_col_offset; + // Check if already in join keys + if !fact_join_key_indices.contains(&local_idx) { + let pos = lower_group_exprs.len(); + let field_name = fact_schema.field(local_idx).name().clone(); + lower_group_exprs.push(( + Arc::new(Column::new(&field_name, local_idx)), + alias.clone(), + )); + fact_group_col_positions.push((idx, pos)); + } + } + } + } + + // Remap aggregate function expressions to fact-side local schema + let mut lower_aggr_exprs: Vec< + Arc, + > = Vec::new(); + for aggr in agg_exec.aggr_expr().iter() { + // We reuse the same AggregateFunctionExpr — the mode on AggregateExec + // determines whether update_batch or merge_batch is called + lower_aggr_exprs.push(Arc::clone(aggr)); + } + + // But we need to remap the column references inside the aggregate expressions + // to point to the fact-side local schema. We'll need to rebuild them. + let remapped_aggr_exprs = remap_aggregate_exprs_to_fact_side( + agg_exec.aggr_expr(), + fact_col_offset, + &fact_schema, + )?; + + // Create the lower (Partial) AggregateExec + let lower_group_by = PhysicalGroupBy::new_single(lower_group_exprs.clone()); + let lower_filter_exprs = vec![None; remapped_aggr_exprs.len()]; + + let lower_agg = AggregateExec::try_new( + AggregateMode::Partial, + lower_group_by, + remapped_aggr_exprs.clone(), + lower_filter_exprs, + fact_child, + fact_schema, + )?; + let lower_agg_arc: Arc = Arc::new(lower_agg); + let lower_agg_schema = lower_agg_arc.schema(); + + // Build the new join with the lower aggregate replacing the fact side + // Join keys must be remapped to lower aggregate output positions + let new_join_on: Vec<(Arc, Arc)> = join_on + .iter() + .enumerate() + .map(|(i, (left_key, right_key))| { + // The join key in the lower aggregate is at position i (we added them first) + let lower_key_name = lower_agg_schema.field(i).name(); + let lower_col: Arc = + Arc::new(Column::new(lower_key_name, i)); + + match agg_side { + AggregateSide::Left => { + // fact is left, dim is right + (lower_col, Arc::clone(right_key)) + } + AggregateSide::Right => { + // fact is right, dim is left + (Arc::clone(left_key), lower_col) + } + } + }) + .collect(); + + let (new_left, new_right) = match agg_side { + AggregateSide::Left => (lower_agg_arc, dim_child), + AggregateSide::Right => (dim_child, lower_agg_arc), + }; + + let new_join = HashJoinExec::try_new( + Arc::clone(&new_left), + Arc::clone(&new_right), + new_join_on, + None, // no residual filter + &JoinType::Inner, + None, // no projection + *hash_join.partition_mode(), + NullEquality::NullEqualsNothing, + false, + )?; + let new_join_arc: Arc = Arc::new(new_join); + let new_join_schema = new_join_arc.schema(); + + // Build the upper (Final) AggregateExec + // Remap original group-by columns to new join output schema + let new_left_col_count = new_left.schema().fields().len(); + let upper_group_exprs = build_upper_group_exprs( + agg_exec, + agg_side, + left_col_count, + new_left_col_count, + &new_join_schema, + &lower_group_exprs, + )?; + + let upper_group_by = PhysicalGroupBy::new_single(upper_group_exprs); + + // For the upper aggregate in Final mode, we need aggregate expressions + // that reference the state columns in the new join output. + // The state columns from the lower Partial aggregate are in the lower_agg output + // after the group-by columns. + let upper_aggr_exprs = remap_aggregate_exprs_for_final( + &remapped_aggr_exprs, + agg_side, + new_left_col_count, + lower_group_exprs.len(), + &new_join_schema, + )?; + + let upper_filter_exprs = vec![None; upper_aggr_exprs.len()]; + let upper_mode = match agg_exec.mode() { + AggregateMode::SinglePartitioned => AggregateMode::FinalPartitioned, + _ => AggregateMode::Final, + }; + + let upper_agg = AggregateExec::try_new( + upper_mode, + upper_group_by, + upper_aggr_exprs, + upper_filter_exprs, + new_join_arc, + new_join_schema, + )?; + + Ok(Transformed::yes( + Arc::new(upper_agg) as Arc + )) +} + +/// Remap aggregate expressions so their Column references point to the fact-side +/// local schema (subtracting the offset for right-side columns). +fn remap_aggregate_exprs_to_fact_side( + aggr_exprs: &[Arc], + fact_col_offset: usize, + fact_schema: &arrow::datatypes::SchemaRef, +) -> Result>> { + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + + let mut result = Vec::with_capacity(aggr_exprs.len()); + for aggr in aggr_exprs.iter() { + let new_args: Vec> = aggr + .expressions() + .into_iter() + .map(|expr| { + if let Some(col) = expr.downcast_ref::() { + let new_idx = col.index() - fact_col_offset; + let field_name = fact_schema.field(new_idx).name(); + Arc::new(Column::new(field_name, new_idx)) as Arc + } else { + expr + } + }) + .collect(); + + let fun: Arc = Arc::new(aggr.fun().clone()); + let builder = AggregateExprBuilder::new(fun, new_args) + .schema(Arc::clone(fact_schema)) + .alias(aggr.name().to_string()); + + result.push(builder.build().map(Arc::new)?); + } + Ok(result) +} + +/// Build upper group-by expressions remapped to the new join output schema. +fn build_upper_group_exprs( + agg_exec: &AggregateExec, + agg_side: AggregateSide, + original_left_col_count: usize, + new_left_col_count: usize, + new_join_schema: &arrow::datatypes::SchemaRef, + lower_group_exprs: &[(Arc, String)], +) -> Result, String)>> { + let mut upper_exprs = Vec::new(); + + for (expr, alias) in agg_exec.group_expr().expr() { + if let Some(col) = expr.downcast_ref::() { + let orig_idx = col.index(); + let is_fact_side = match agg_side { + AggregateSide::Left => orig_idx < original_left_col_count, + AggregateSide::Right => orig_idx >= original_left_col_count, + }; + + if is_fact_side { + // This group column is from the fact side — find it in the lower + // aggregate's group-by output + let fact_local_idx = orig_idx + - match agg_side { + AggregateSide::Left => 0, + AggregateSide::Right => original_left_col_count, + }; + + // Find this column in lower_group_exprs + let lower_pos = lower_group_exprs.iter().position(|(e, _)| { + e.downcast_ref::() + .is_some_and(|c| c.index() == fact_local_idx) + }); + + if let Some(pos) = lower_pos { + // In the new join, the lower aggregate is on one side + let new_idx = match agg_side { + AggregateSide::Left => pos, + AggregateSide::Right => new_left_col_count + pos, + }; + let field_name = new_join_schema.field(new_idx).name(); + upper_exprs.push(( + Arc::new(Column::new(field_name, new_idx)) as _, + alias.clone(), + )); + } else { + return Ok(Vec::new()); // shouldn't happen + } + } else { + // This group column is from the dimension side + let dim_local_idx = match agg_side { + AggregateSide::Left => orig_idx - original_left_col_count, + AggregateSide::Right => orig_idx, + }; + + let new_idx = match agg_side { + AggregateSide::Left => new_left_col_count + dim_local_idx, + AggregateSide::Right => dim_local_idx, + }; + let field_name = new_join_schema.field(new_idx).name(); + upper_exprs.push(( + Arc::new(Column::new(field_name, new_idx)) as _, + alias.clone(), + )); + } + } else { + // Non-column group-by expression — bail out + return Ok(Vec::new()); + } + } + + Ok(upper_exprs) +} + +/// Remap aggregate expressions for the Final mode above the join. +/// The Final aggregate reads state columns from the lower Partial aggregate's output, +/// which are now accessible through the join output schema. +fn remap_aggregate_exprs_for_final( + lower_aggr_exprs: &[Arc< + datafusion_physical_expr::aggregate::AggregateFunctionExpr, + >], + agg_side: AggregateSide, + new_left_col_count: usize, + num_lower_group_cols: usize, + new_join_schema: &arrow::datatypes::SchemaRef, +) -> Result>> { + use datafusion_physical_expr::aggregate::AggregateExprBuilder; + + // In the new join output, state columns from the lower aggregate + // start after the group-by columns on the fact side of the join. + let fact_side_offset = match agg_side { + AggregateSide::Left => 0, + AggregateSide::Right => new_left_col_count, + }; + + let mut state_col_start = fact_side_offset + num_lower_group_cols; + let mut result = Vec::with_capacity(lower_aggr_exprs.len()); + + for aggr in lower_aggr_exprs.iter() { + // Each aggregate in Partial mode outputs state_fields() columns + let state_fields = aggr.state_fields()?; + let num_state_cols = state_fields.len(); + + // For the Final aggregate, we need to reference these state columns + // The aggregate expression itself stays the same — only the column + // references in its input change based on mode. But in DataFusion, + // the AggregateFunctionExpr's expressions() return the original input + // columns (used in Raw mode) and state_fields() defines what Final reads. + // + // When AggregateExec is in Final mode, it reads state columns starting + // after the group-by columns in its input schema. The expressions() + // on the AggregateFunctionExpr are not used in Final mode — the executor + // reads state fields by position. + // + // So we just need to pass the same AggregateFunctionExpr, and the Final + // mode executor will read columns at the correct offsets based on + // the input schema structure. + + // Build a new expression pointing to the state columns in the join output + let new_args: Vec> = (0..num_state_cols) + .map(|i| { + let col_idx = state_col_start + i; + let field_name = new_join_schema.field(col_idx).name(); + Arc::new(Column::new(field_name, col_idx)) as Arc + }) + .collect(); + + state_col_start += num_state_cols; + + // For the upper (Final) aggregate, we reuse the same UDF but point + // its input expressions to where the state lives in the join output. + // However, in Final mode the executor ignores expressions() and uses + // state_fields() positionally. We just need to pass the same expr. + let fun: Arc = Arc::new(aggr.fun().clone()); + let builder = AggregateExprBuilder::new(fun, new_args) + .schema(Arc::clone(new_join_schema)) + .alias(aggr.name().to_string()); + + result.push(builder.build().map(Arc::new)?); + } + + Ok(result) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_eager_aggregation_rule_name() { + let rule = EagerAggregation::new(); + assert_eq!(rule.name(), "EagerAggregation"); + assert!(rule.schema_check()); + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 5fac8948b7f04..5a29012fb7c4f 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -27,6 +27,7 @@ pub mod aggregate_statistics; pub mod combine_partial_final_agg; +pub mod eager_aggregation; pub mod enforce_distribution; pub mod enforce_sorting; pub mod ensure_coop; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index 05df642f8446b..fe9fa85c084c5 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use crate::aggregate_statistics::AggregateStatistics; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; +use crate::eager_aggregation::EagerAggregation; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; use crate::ensure_coop::EnsureCooperative; @@ -160,6 +161,11 @@ impl PhysicalOptimizer { // repartitioning and local sorting steps to meet distribution and ordering requirements. // Therefore, it should run before EnforceDistribution and EnforceSorting. Arc::new(JoinSelection::new()), + // The EagerAggregation rule pushes partial aggregations below joins when the + // aggregate's fact-side input can be significantly reduced by pre-aggregating + // on the join key. Must run after JoinSelection (join types finalized) and + // before EnforceDistribution (which adds repartitioning). + Arc::new(EagerAggregation::new()), // The LimitedDistinctAggregation rule should be applied before the EnforceDistribution rule, // as that rule may inject other operations in between the different AggregateExecs. // Applying the rule early means only directly-connected AggregateExecs must be examined. diff --git a/datafusion/sqllogictest/test_files/eager_aggregation.slt b/datafusion/sqllogictest/test_files/eager_aggregation.slt new file mode 100644 index 0000000000000..e1d7f624c935e --- /dev/null +++ b/datafusion/sqllogictest/test_files/eager_aggregation.slt @@ -0,0 +1,114 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for the EagerAggregation physical optimizer rule + +# Create test tables simulating a star schema (fact + dimension) +statement ok +CREATE TABLE fact_table ( + dim_key INT, + measure1 INT, + measure2 DOUBLE +) AS VALUES +(1, 10, 1.0), (1, 20, 2.0), (1, 30, 3.0), +(2, 40, 4.0), (2, 50, 5.0), +(3, 60, 6.0), (3, 70, 7.0), (3, 80, 8.0), (3, 90, 9.0); + +statement ok +CREATE TABLE dim_table ( + id INT, + name VARCHAR, + category VARCHAR +) AS VALUES +(1, 'alpha', 'A'), +(2, 'beta', 'B'), +(3, 'gamma', 'A'); + +# Basic test: SUM aggregate over inner join with GROUP BY on dimension columns +query TIR rowsort +SELECT d.name, SUM(f.measure1), SUM(f.measure2) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +GROUP BY d.name; +---- +alpha 60 6 +beta 90 9 +gamma 300 30 + +# Test: COUNT aggregate +query TI rowsort +SELECT d.category, COUNT(f.measure1) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +GROUP BY d.category; +---- +A 7 +B 2 + +# Test: MIN/MAX aggregates +query TIIR rowsort +SELECT d.name, MIN(f.measure1), MAX(f.measure1), MAX(f.measure2) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +GROUP BY d.name; +---- +alpha 10 30 3 +beta 40 50 5 +gamma 60 90 9 + +# Test: Multiple aggregates combined +query TIIRI rowsort +SELECT d.category, SUM(f.measure1), MIN(f.measure1), MAX(f.measure2), COUNT(*) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +GROUP BY d.category; +---- +A 360 10 9 7 +B 90 40 5 2 + +# Test: Verify optimization is disabled for DISTINCT (should still produce correct results) +query TI rowsort +SELECT d.name, COUNT(DISTINCT f.measure1) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +GROUP BY d.name; +---- +alpha 3 +beta 2 +gamma 4 + +# Test: Verify LEFT JOIN does NOT trigger optimization (correct results still) +query TIR rowsort +SELECT d.name, SUM(f.measure1), SUM(f.measure2) +FROM dim_table d +LEFT JOIN fact_table f ON d.id = f.dim_key +GROUP BY d.name; +---- +alpha 60 6 +beta 90 9 +gamma 300 30 + +# Test: Verify optimization works with WHERE clause on dimension +query TIR rowsort +SELECT d.name, SUM(f.measure1), SUM(f.measure2) +FROM fact_table f +INNER JOIN dim_table d ON f.dim_key = d.id +WHERE d.category = 'A' +GROUP BY d.name; +---- +alpha 60 6 +gamma 300 30 diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2e8a65385541e..c2580a16807b3 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -233,6 +233,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after EagerAggregation SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -314,6 +315,7 @@ physical_plan after OutputRequirements 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=Exact(32)),(Col[5]: ScanBytes=Exact(64)),(Col[6]: ScanBytes=Exact(32)),(Col[7]: ScanBytes=Exact(64)),(Col[8]: ScanBytes=Inexact(88)),(Col[9]: ScanBytes=Inexact(49)),(Col[10]: ScanBytes=Exact(64))]] physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after EagerAggregation SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -361,6 +363,7 @@ physical_plan after OutputRequirements 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after EagerAggregation SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE @@ -608,6 +611,7 @@ physical_plan after OutputRequirements 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after aggregate_statistics SAME TEXT AS ABOVE physical_plan after join_selection SAME TEXT AS ABOVE +physical_plan after EagerAggregation SAME TEXT AS ABOVE physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after FilterPushdown SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 8396a60137ee1..f7bf208d6c803 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,6 +297,7 @@ datafusion.format.timestamp_tz_format NULL datafusion.format.types_info false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.eager_aggregation true datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true @@ -444,6 +445,7 @@ datafusion.format.timestamp_tz_format NULL Timestamp format for timestamp with t datafusion.format.types_info false Show types in visual representation batches datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.eager_aggregation true When set to true, the physical plan optimizer will attempt to push partial aggregations below joins when this reduces the number of rows flowing into the join (eager aggregation). datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index f7f9426e2bd32..5e5c9c1f503e7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -162,6 +162,7 @@ The following configuration settings are available: | datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | | datafusion.optimizer.join_reordering | true | When set to true, the physical plan optimizer may swap join inputs based on statistics. When set to false, statistics-driven join input reordering is disabled and the original join order in the query is used. | | datafusion.optimizer.use_statistics_registry | false | When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for statistics propagation across operators. This enables more accurate cardinality estimates compared to each operator's built-in `partition_statistics`. | +| datafusion.optimizer.eager_aggregation | true | When set to true, the physical plan optimizer will attempt to push partial aggregations below joins when this reduces the number of rows flowing into the join (eager aggregation). | | datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | | datafusion.optimizer.enable_piecewise_merge_join | false | When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. | | datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |