diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index b1aa850284aee..6af727777a42c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -922,18 +922,22 @@ impl DefaultPhysicalPlanner { input_exec }; - let initial_aggr = Arc::new(AggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates, - filters.clone(), - input_exec, - Arc::clone(&physical_input_schema), - )?); - let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 && session_state.config().repartition_aggregations(); + let require_single_output_partition = !can_repartition; + + let initial_aggr = Arc::new( + AggregateExec::try_new_with_require_single_output_partition( + AggregateMode::Partial, + groups.clone(), + aggregates, + filters.clone(), + input_exec, + Arc::clone(&physical_input_schema), + require_single_output_partition, + )?, + ); // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn @@ -942,24 +946,16 @@ impl DefaultPhysicalPlanner { // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); - let next_partition_mode = if can_repartition { - // construct a second aggregation with 'AggregateMode::FinalPartitioned' - AggregateMode::FinalPartitioned - } else { - // construct a second aggregation, keeping the final column name equal to the - // first aggregation and the expressions corresponding to the respective aggregate - AggregateMode::Final - }; - let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new(AggregateExec::try_new( - next_partition_mode, + Arc::new(AggregateExec::try_new_with_require_single_output_partition( + AggregateMode::Final, final_grouping_set, updated_aggregates, filters, initial_aggr, Arc::clone(&physical_input_schema), + require_single_output_partition, )?) } LogicalPlan::Projection(Projection { input, expr, .. }) => self @@ -3436,9 +3432,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } @@ -3467,9 +3465,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } @@ -3488,9 +3488,11 @@ mod tests { let execution_plan = plan(&logical_plan).await?; let formatted = format!("{execution_plan:?}"); - // Make sure the plan contains a FinalPartitioned, which means it will not use the Final - // mode in Aggregate (which is slower) - assert!(formatted.contains("FinalPartitioned")); + // Make sure the plan uses hash-partitioned input for the final aggregate. + assert!( + formatted.contains("FinalPartitioned") + || formatted.contains("HashPartitioned") + ); Ok(()) } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..97fd1225a387c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -328,23 +328,25 @@ fn aggregate_exec_with_alias( let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); Arc::new( - AggregateExec::try_new( - AggregateMode::FinalPartitioned, + AggregateExec::try_new_with_require_single_output_partition( + AggregateMode::Final, final_grouping, vec![], vec![], Arc::new( - AggregateExec::try_new( + AggregateExec::try_new_with_require_single_output_partition( AggregateMode::Partial, group_by, vec![], vec![], input, schema.clone(), + false, ) .unwrap(), ), schema, + false, ) .unwrap(), ) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index b33305c23ede2..707d6907d9730 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -882,7 +882,7 @@ mod test { .await?; let agg_final = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + AggregateMode::Final, group_by.clone(), aggr_expr.clone(), vec![None], @@ -954,7 +954,7 @@ mod test { )?); let agg_final = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + AggregateMode::Final, group_by.clone(), aggr_expr.clone(), vec![None], diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 6d8e7995c18c2..821c8addc2bc8 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -58,10 +58,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { return Ok(Transformed::no(plan)); }; - if !matches!( - agg_exec.mode(), - AggregateMode::Final | AggregateMode::FinalPartitioned - ) { + if !matches!(agg_exec.mode(), AggregateMode::Final) { return Ok(Transformed::no(plan)); } @@ -85,13 +82,9 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { input_agg_exec.filter_expr(), ), ) { - let mode = if agg_exec.mode() == &AggregateMode::Final { - AggregateMode::Single - } else { - AggregateMode::SinglePartitioned - }; - AggregateExec::try_new( - mode, + AggregateExec::try_new_with_settings_from( + agg_exec, + AggregateMode::Single, input_agg_exec.group_expr().clone(), input_agg_exec.aggr_expr().to_vec(), input_agg_exec.filter_expr().to_vec(), diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index acb1c588097ee..073301f0e84f1 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -393,7 +393,12 @@ pub fn adjust_input_keys_ordering( .map(Transformed::yes); } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::() { if !requirements.data.is_empty() { - if aggregate_exec.mode() == &AggregateMode::FinalPartitioned { + if aggregate_exec.mode() == &AggregateMode::Final + && matches!( + aggregate_exec.required_input_distribution().first(), + Some(Distribution::HashPartitioned(_)) + ) + { return reorder_aggregate_keys(requirements, aggregate_exec) .map(Transformed::yes); } else { @@ -504,7 +509,8 @@ pub fn reorder_aggregate_keys( .into_iter() .map(|idx| group_exprs[idx].clone()) .collect(); - let partial_agg = Arc::new(AggregateExec::try_new( + let partial_agg = Arc::new(AggregateExec::try_new_with_settings_from( + agg_exec, AggregateMode::Partial, PhysicalGroupBy::new_single(new_group_exprs), agg_exec.aggr_expr().to_vec(), @@ -523,8 +529,9 @@ pub fn reorder_aggregate_keys( .map(|(idx, expr)| (expr, group_exprs[idx].1.clone())) .collect(), ); - let new_final_agg = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + let new_final_agg = Arc::new(AggregateExec::try_new_with_settings_from( + agg_exec, + AggregateMode::Final, new_group_by, agg_exec.aggr_expr().to_vec(), agg_exec.filter_expr().to_vec(), diff --git a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs index fe9636f67619b..175cbc0a0df07 100644 --- a/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs +++ b/datafusion/physical-optimizer/src/limited_distinct_aggregation.rs @@ -20,7 +20,8 @@ use std::sync::Arc; -use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions}; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, LimitOptions}; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; @@ -54,7 +55,8 @@ impl LimitedDistinctAggregation { } // We found what we want: clone, copy the limit down, and return modified node - let new_aggr = AggregateExec::try_new( + let new_aggr = AggregateExec::try_new_with_settings_from( + aggr, *aggr.mode(), aggr.group_expr().clone(), aggr.aggr_expr().to_vec(), @@ -64,6 +66,54 @@ impl LimitedDistinctAggregation { ) .expect("Unable to copy Aggregate!") .with_limit_options(Some(LimitOptions::new(limit))); + + if matches!(aggr.mode(), AggregateMode::Final) + && let (child_plan, wrap_coalesce) = if let Some(coalesce) = aggr + .input() + .as_any() + .downcast_ref::() + { + (Arc::clone(coalesce.input()), true) + } else { + (Arc::clone(aggr.input()), false) + } + && let Some(child_agg) = child_plan.as_any().downcast_ref::() + && matches!(child_agg.mode(), AggregateMode::Partial) + && !child_agg.group_expr().has_grouping_set() + { + let new_child = AggregateExec::try_new_with_settings_from( + child_agg, + AggregateMode::Partial, + child_agg.group_expr().clone(), + child_agg.aggr_expr().to_vec(), + child_agg.filter_expr().to_vec(), + child_agg.input().to_owned(), + child_agg.input_schema(), + ) + .expect("Unable to copy Aggregate!") + .with_limit_options(Some(LimitOptions::new(limit))); + + let new_input: Arc = if wrap_coalesce { + Arc::new(CoalescePartitionsExec::new(Arc::new(new_child))) + } else { + Arc::new(new_child) + }; + + let rebuilt_final = AggregateExec::try_new_with_settings_from( + &new_aggr, + AggregateMode::Final, + new_aggr.group_expr().clone(), + new_aggr.aggr_expr().to_vec(), + new_aggr.filter_expr().to_vec(), + new_input, + new_aggr.input_schema(), + ) + .expect("Unable to copy Aggregate!") + .with_limit_options(Some(LimitOptions::new(limit))); + + return Some(Arc::new(rebuilt_final)); + } + Some(Arc::new(new_aggr)) } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index d645f5c55d434..d53ff25085ab8 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -95,22 +95,19 @@ const AGGREGATION_HASH_SEED: ahash::RandomState = /// aggregation and how these modes are used. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { - /// One of multiple layers of aggregation, any input partitioning + /// One of multiple layers of aggregation, input values -> accumulator state. /// /// Partial aggregate that can be applied in parallel across input /// partitions. /// /// This is the first phase of a multi-phase aggregation. Partial, - /// *Final* of multiple layers of aggregation, in exactly one partition + /// *Final* of multiple layers of aggregation, accumulator state -> final value. /// - /// Final aggregate that produces a single partition of output by combining - /// the output of multiple partial aggregates. + /// Final aggregate that combines the output of multiple partial aggregates. /// /// This is the second phase of a multi-phase aggregation. /// - /// This mode requires that the input is a single partition - /// /// Note: Adjacent `Partial` and `Final` mode aggregation is equivalent to a `Single` /// mode aggregation node. The `Final` mode is required since this is used in an /// intermediate step. The [`CombinePartialFinalAggregate`] physical optimizer rule @@ -118,32 +115,12 @@ pub enum AggregateMode { /// /// [`CombinePartialFinalAggregate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/combine_partial_final_agg/struct.CombinePartialFinalAggregate.html Final, - /// *Final* of multiple layers of aggregation, input is *Partitioned* - /// - /// Final aggregate that works on pre-partitioned data. - /// - /// This mode requires that all rows with a particular grouping key are in - /// the same partitions, such as is the case with Hash repartitioning on the - /// group keys. If a group key is duplicated, duplicate groups would be - /// produced - FinalPartitioned, - /// *Single* layer of Aggregation, input is exactly one partition + /// *Single* layer of aggregation, input values -> final value. /// /// Applies the entire logical aggregation operation in a single operator, /// as opposed to Partial / Final modes which apply the logical aggregation using /// two operators. - /// - /// This mode requires that the input is a single partition (like Final) Single, - /// *Single* layer of Aggregation, input is *Partitioned* - /// - /// Applies the entire logical aggregation operation in a single operator, - /// as opposed to Partial / Final modes which apply the logical aggregation - /// using two operators. - /// - /// This mode requires that the input has more than one partition, and is - /// partitioned by group key (like FinalPartitioned). - SinglePartitioned, } impl AggregateMode { @@ -152,10 +129,8 @@ impl AggregateMode { /// `merge_batch` method will not be called for these modes. pub fn is_first_stage(&self) -> bool { match self { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => true, - AggregateMode::Final | AggregateMode::FinalPartitioned => false, + AggregateMode::Partial | AggregateMode::Single => true, + AggregateMode::Final => false, } } } @@ -539,7 +514,7 @@ impl LimitOptions { } /// Hash aggregate execution plan -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AggregateExec { /// Aggregation mode (full, partial) mode: AggregateMode, @@ -566,6 +541,8 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, + /// Whether final/single aggregation should require a single input partition + require_single_output_partition: bool, cache: PlanProperties, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. @@ -576,6 +553,30 @@ pub struct AggregateExec { dynamic_filter: Option>, } +impl std::fmt::Debug for AggregateExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregateExec") + .field("mode", &self.mode_display()) + .field("group_by", &self.group_by) + .field("aggr_expr", &self.aggr_expr) + .field("filter_expr", &self.filter_expr) + .field("limit_options", &self.limit_options) + .field("input", &self.input) + .field("schema", &self.schema) + .field("input_schema", &self.input_schema) + .field("metrics", &self.metrics) + .field("required_input_ordering", &self.required_input_ordering) + .field("input_order_mode", &self.input_order_mode) + .field( + "require_single_output_partition", + &self.require_single_output_partition, + ) + .field("cache", &self.cache) + .field("dynamic_filter", &self.dynamic_filter) + .finish() + } +} + impl AggregateExec { /// Function used in `OptimizeAggregateOrder` optimizer rule, /// where we need parts of the new value, others cloned from the old one @@ -590,6 +591,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), + require_single_output_partition: self.require_single_output_partition, cache: self.cache.clone(), mode: self.mode, group_by: self.group_by.clone(), @@ -610,6 +612,7 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), + require_single_output_partition: self.require_single_output_partition, cache: self.cache.clone(), mode: self.mode, group_by: self.group_by.clone(), @@ -649,6 +652,50 @@ impl AggregateExec { ) } + /// Create a new [`AggregateExec`] with an explicit input partition requirement. + pub fn try_new_with_require_single_output_partition( + mode: AggregateMode, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + input: Arc, + input_schema: SchemaRef, + require_single_output_partition: bool, + ) -> Result { + let mut exec = AggregateExec::try_new( + mode, + group_by, + aggr_expr, + filter_expr, + input, + input_schema, + )?; + exec.require_single_output_partition = require_single_output_partition; + Ok(exec) + } + + /// Create a new [`AggregateExec`] while reusing the input partitioning + /// requirement from an existing aggregate. + pub fn try_new_with_settings_from( + settings: &AggregateExec, + mode: AggregateMode, + group_by: PhysicalGroupBy, + aggr_expr: Vec>, + filter_expr: Vec>>, + input: Arc, + input_schema: SchemaRef, + ) -> Result { + Self::try_new_with_require_single_output_partition( + mode, + group_by, + aggr_expr, + filter_expr, + input, + input_schema, + settings.require_single_output_partition, + ) + } + /// Create a new hash aggregate execution plan with the given schema. /// This constructor isn't part of the public API, it is used internally /// by DataFusion to enforce schema consistency during when re-creating @@ -747,6 +794,7 @@ impl AggregateExec { required_input_ordering, limit_options: None, input_order_mode, + require_single_output_partition: true, cache, dynamic_filter: None, }; @@ -761,12 +809,41 @@ impl AggregateExec { &self.mode } + /// Display-friendly mode label that includes input partitioning. + fn mode_display(&self) -> &'static str { + let is_hash_partitioned = matches!( + self.required_input_distribution().first(), + Some(Distribution::HashPartitioned(_)) + ); + match (self.mode, is_hash_partitioned) { + (AggregateMode::Final, true) => "FinalPartitioned", + (AggregateMode::Single, true) => "SinglePartitioned", + (AggregateMode::Final, false) => "Final", + (AggregateMode::Single, false) => "Single", + (AggregateMode::Partial, _) => "Partial", + } + } + /// Set the limit options for this AggExec pub fn with_limit_options(mut self, limit_options: Option) -> Self { self.limit_options = limit_options; self } + /// Configure whether final/single aggregates should require a single input partition. + pub fn with_require_single_output_partition( + mut self, + require_single_output_partition: bool, + ) -> Self { + self.require_single_output_partition = require_single_output_partition; + self + } + + /// Returns whether final/single aggregation requires a single input partition. + pub fn require_single_output_partition(&self) -> bool { + self.require_single_output_partition + } + /// Get the limit options (if set) pub fn limit_options(&self) -> Option { self.limit_options @@ -974,9 +1051,7 @@ impl AggregateExec { column_statistics }; match self.mode { - AggregateMode::Final | AggregateMode::FinalPartitioned - if self.group_by.expr.is_empty() => - { + AggregateMode::Final if self.group_by.expr.is_empty() => { let total_byte_size = Self::calculate_scaled_byte_size(child_statistics, 1); @@ -1124,7 +1199,7 @@ impl DisplayAs for AggregateExec { } }; - write!(f, "AggregateExec: mode={:?}", self.mode)?; + write!(f, "AggregateExec: mode={}", self.mode_display())?; let g: Vec = if self.group_by.is_single() { self.group_by .expr @@ -1216,7 +1291,7 @@ impl DisplayAs for AggregateExec { .iter() .map(|agg| agg.human_display().to_string()) .collect(); - writeln!(f, "mode={:?}", self.mode)?; + writeln!(f, "mode={}", self.mode_display())?; if !g.is_empty() { writeln!(f, "group_by={}", g.join(", "))?; } @@ -1247,15 +1322,16 @@ impl ExecutionPlan for AggregateExec { } fn required_input_distribution(&self) -> Vec { - match &self.mode { - AggregateMode::Partial => { - vec![Distribution::UnspecifiedDistribution] - } - AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => { - vec![Distribution::HashPartitioned(self.group_by.input_exprs())] + match self.mode { + AggregateMode::Partial => vec![Distribution::UnspecifiedDistribution], + AggregateMode::Final | AggregateMode::Single + if self.require_single_output_partition + || self.group_by.expr.is_empty() => + { + vec![Distribution::SinglePartition] } AggregateMode::Final | AggregateMode::Single => { - vec![Distribution::SinglePartition] + vec![Distribution::HashPartitioned(self.group_by.input_exprs())] } } } @@ -1295,6 +1371,7 @@ impl ExecutionPlan for AggregateExec { Arc::clone(&self.schema), )?; me.limit_options = self.limit_options; + me.require_single_output_partition = self.require_single_output_partition; me.dynamic_filter = self.dynamic_filter.clone(); Ok(Arc::new(me)) @@ -1484,10 +1561,7 @@ fn create_schema( fields.extend(expr.state_fields()?.iter().cloned()); } } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => { + AggregateMode::Final | AggregateMode::Single => { // in final mode, the field with the final result of the accumulator for expr in aggr_expr { fields.push(expr.field()) @@ -1697,9 +1771,7 @@ pub fn aggregate_expressions( col_idx_base: usize, ) -> Result>>> { match mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => Ok(aggr_expr + AggregateMode::Partial | AggregateMode::Single => Ok(aggr_expr .iter() .map(|agg| { let mut result = agg.expressions(); @@ -1711,7 +1783,7 @@ pub fn aggregate_expressions( }) .collect()), // In this mode, we build the merge expressions of the aggregation. - AggregateMode::Final | AggregateMode::FinalPartitioned => { + AggregateMode::Final => { let mut col_idx_base = col_idx_base; aggr_expr .iter() @@ -1754,7 +1826,7 @@ pub fn create_accumulators( } /// returns a vector of ArrayRefs, where each entry corresponds to either the -/// final value (mode = Final, FinalPartitioned and Single) or states (mode = Partial) +/// final value (mode = Final and Single) or states (mode = Partial) pub fn finalize_aggregation( accumulators: &mut [AccumulatorItem], mode: &AggregateMode, @@ -1774,10 +1846,7 @@ pub fn finalize_aggregation( .flatten_ok() .collect() } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => { + AggregateMode::Final | AggregateMode::Single => { // Merge the state to the final value accumulators .iter_mut() @@ -3106,7 +3175,7 @@ mod tests { None, )?; let aggregate_exec = Arc::new(AggregateExec::try_new( - AggregateMode::FinalPartitioned, + AggregateMode::Final, group_by, aggr_expr, vec![None], diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..dc6e2c5206d10 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -283,12 +283,8 @@ impl AggregateStream { let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?; let filter_expressions = match agg.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, - AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] - } + AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, + AggregateMode::Final => vec![None; agg.aggr_expr.len()], }; let accumulators = create_accumulators(&agg.aggr_expr)?; @@ -456,12 +452,10 @@ fn aggregate_batch( // 1.4 let size_pre = accum.size(); let res = match mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => accum.update_batch(&values), - AggregateMode::Final | AggregateMode::FinalPartitioned => { - accum.merge_batch(&values) + AggregateMode::Partial | AggregateMode::Single => { + accum.update_batch(&values) } + AggregateMode::Final => accum.merge_batch(&values), }; let size_post = accum.size(); allocated += size_post.saturating_sub(size_pre); diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 49ce125e739b3..fba06b0788bed 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -492,12 +492,8 @@ impl GroupedHashAggregateStream { )?; let filter_expressions = match agg.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, - AggregateMode::Final | AggregateMode::FinalPartitioned => { - vec![None; agg.aggr_expr.len()] - } + AggregateMode::Partial | AggregateMode::Single => agg_filter_expr, + AggregateMode::Final => vec![None; agg.aggr_expr.len()], }; // Instantiate the accumulators @@ -983,9 +979,7 @@ impl GroupedHashAggregateStream { // Call the appropriate method on each aggregator with // the entire input row and the relevant group indexes match self.mode { - AggregateMode::Partial - | AggregateMode::Single - | AggregateMode::SinglePartitioned + AggregateMode::Partial | AggregateMode::Single if !self.spill_state.is_stream_merging => { acc.update_batch( @@ -1099,10 +1093,9 @@ impl GroupedHashAggregateStream { // merged and re-evaluated later. output.extend(acc.state(emit_to)?) } - AggregateMode::Final - | AggregateMode::FinalPartitioned - | AggregateMode::Single - | AggregateMode::SinglePartitioned => output.push(acc.evaluate(emit_to)?), + AggregateMode::Final | AggregateMode::Single => { + output.push(acc.evaluate(emit_to)?) + } } } drop(timer); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 810ec6d1f17a3..707041d8363c7 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1252,6 +1252,7 @@ message AggregateExecNode { repeated MaybeFilter filter_expr = 10; AggLimit limit = 11; bool has_grouping_set = 12; + bool require_single_output_partition = 13; } message GlobalLimitExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 7ed20785ab384..6edf0f585827b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -154,6 +154,9 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { len += 1; } + if self.require_single_output_partition { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.AggregateExecNode", len)?; if !self.group_expr.is_empty() { struct_ser.serialize_field("groupExpr", &self.group_expr)?; @@ -193,6 +196,12 @@ impl serde::Serialize for AggregateExecNode { if self.has_grouping_set { struct_ser.serialize_field("hasGroupingSet", &self.has_grouping_set)?; } + if self.require_single_output_partition { + struct_ser.serialize_field( + "requireSingleOutputPartition", + &self.require_single_output_partition, + )?; + } struct_ser.end() } } @@ -223,6 +232,10 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "limit", "has_grouping_set", "hasGroupingSet", + "require_single_output_partition", + "requireSingleOutputPartition", + "repartition_aggregations", + "repartitionAggregations", ]; #[allow(clippy::enum_variant_names)] @@ -239,6 +252,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { FilterExpr, Limit, HasGroupingSet, + RequireSingleOutputPartition, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -272,6 +286,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { "filterExpr" | "filter_expr" => Ok(GeneratedField::FilterExpr), "limit" => Ok(GeneratedField::Limit), "hasGroupingSet" | "has_grouping_set" => Ok(GeneratedField::HasGroupingSet), + "requireSingleOutputPartition" | "require_single_output_partition" => Ok(GeneratedField::RequireSingleOutputPartition), + "repartitionAggregations" | "repartition_aggregations" => Ok(GeneratedField::RequireSingleOutputPartition), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -303,6 +319,7 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { let mut filter_expr__ = None; let mut limit__ = None; let mut has_grouping_set__ = None; + let mut require_single_output_partition__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::GroupExpr => { @@ -377,6 +394,15 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { } has_grouping_set__ = Some(map_.next_value()?); } + GeneratedField::RequireSingleOutputPartition => { + if require_single_output_partition__.is_some() { + return Err(serde::de::Error::duplicate_field( + "requireSingleOutputPartition", + )); + } + require_single_output_partition__ = + Some(map_.next_value()?); + } } } Ok(AggregateExecNode { @@ -392,6 +418,8 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode { filter_expr: filter_expr__.unwrap_or_default(), limit: limit__, has_grouping_set: has_grouping_set__.unwrap_or_default(), + require_single_output_partition: require_single_output_partition__ + .unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 0c9320c77892b..caa392d4a050a 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1871,6 +1871,8 @@ pub struct AggregateExecNode { pub limit: ::core::option::Option, #[prost(bool, tag = "12")] pub has_grouping_set: bool, + #[prost(bool, tag = "13")] + pub require_single_output_partition: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GlobalLimitExecNode { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index ca213bc722a17..931e8cb3332f7 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -48,7 +48,9 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; -use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; +use datafusion_physical_expr::{ + LexOrdering, LexRequirement, Partitioning, PhysicalExprRef, +}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, }; @@ -78,7 +80,9 @@ use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeE use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec}; use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr}; +use datafusion_physical_plan::{ + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PhysicalExpr, WindowExpr, +}; use prost::Message; use prost::bytes::BufMut; @@ -1078,14 +1082,12 @@ impl protobuf::PhysicalPlanNode { hash_agg.mode )) })?; - let agg_mode: AggregateMode = match mode { + let agg_mode = match mode { protobuf::AggregateMode::Partial => AggregateMode::Partial, - protobuf::AggregateMode::Final => AggregateMode::Final, - protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned, - protobuf::AggregateMode::Single => AggregateMode::Single, - protobuf::AggregateMode::SinglePartitioned => { - AggregateMode::SinglePartitioned - } + protobuf::AggregateMode::Final + | protobuf::AggregateMode::FinalPartitioned => AggregateMode::Final, + protobuf::AggregateMode::Single + | protobuf::AggregateMode::SinglePartitioned => AggregateMode::Single, }; let num_expr = hash_agg.group_expr.len(); @@ -1220,13 +1222,14 @@ impl protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let agg = AggregateExec::try_new( + let agg = AggregateExec::try_new_with_require_single_output_partition( agg_mode, PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set), physical_aggr_expr, physical_filter_expr, input, physical_schema, + hash_agg.require_single_output_partition, )?; let agg = if let Some(limit_proto) = &hash_agg.limit { @@ -2669,14 +2672,16 @@ impl protobuf::PhysicalPlanNode { .map(|expr| expr.name().to_string()) .collect::>(); - let agg_mode = match exec.mode() { - AggregateMode::Partial => protobuf::AggregateMode::Partial, - AggregateMode::Final => protobuf::AggregateMode::Final, - AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned, - AggregateMode::Single => protobuf::AggregateMode::Single, - AggregateMode::SinglePartitioned => { + let agg_mode = match (exec.mode(), exec.input().output_partitioning()) { + (AggregateMode::Partial, _) => protobuf::AggregateMode::Partial, + (AggregateMode::Final, Partitioning::Hash(_, _)) => { + protobuf::AggregateMode::FinalPartitioned + } + (AggregateMode::Final, _) => protobuf::AggregateMode::Final, + (AggregateMode::Single, Partitioning::Hash(_, _)) => { protobuf::AggregateMode::SinglePartitioned } + (AggregateMode::Single, _) => protobuf::AggregateMode::Single, }; let input_schema = exec.input_schema(); let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter( @@ -2719,6 +2724,8 @@ impl protobuf::PhysicalPlanNode { groups, limit, has_grouping_set: exec.group_expr().has_grouping_set(), + require_single_output_partition: exec + .require_single_output_partition(), }, ))), })