Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 31 additions & 29 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,18 +922,22 @@ impl DefaultPhysicalPlanner {
input_exec
};

let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates,
filters.clone(),
input_exec,
Arc::clone(&physical_input_schema),
)?);

let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
let require_single_output_partition = !can_repartition;

let initial_aggr = Arc::new(
AggregateExec::try_new_with_require_single_output_partition(
AggregateMode::Partial,
groups.clone(),
aggregates,
filters.clone(),
input_exec,
Arc::clone(&physical_input_schema),
require_single_output_partition,
)?,
);

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
Expand All @@ -942,24 +946,16 @@ impl DefaultPhysicalPlanner {
// `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();

let next_partition_mode = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
AggregateMode::Final
};

let final_grouping_set = initial_aggr.group_expr().as_final();

Arc::new(AggregateExec::try_new(
next_partition_mode,
Arc::new(AggregateExec::try_new_with_require_single_output_partition(
AggregateMode::Final,
final_grouping_set,
updated_aggregates,
filters,
initial_aggr,
Arc::clone(&physical_input_schema),
require_single_output_partition,
)?)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => self
Expand Down Expand Up @@ -3436,9 +3432,11 @@ mod tests {
let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{execution_plan:?}");

// Make sure the plan contains a FinalPartitioned, which means it will not use the Final
// mode in Aggregate (which is slower)
assert!(formatted.contains("FinalPartitioned"));
// Make sure the plan uses hash-partitioned input for the final aggregate.
assert!(
formatted.contains("FinalPartitioned")
|| formatted.contains("HashPartitioned")
);

Ok(())
}
Expand Down Expand Up @@ -3467,9 +3465,11 @@ mod tests {
let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{execution_plan:?}");

// Make sure the plan contains a FinalPartitioned, which means it will not use the Final
// mode in Aggregate (which is slower)
assert!(formatted.contains("FinalPartitioned"));
// Make sure the plan uses hash-partitioned input for the final aggregate.
assert!(
formatted.contains("FinalPartitioned")
|| formatted.contains("HashPartitioned")
);
Ok(())
}

Expand All @@ -3488,9 +3488,11 @@ mod tests {
let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{execution_plan:?}");

// Make sure the plan contains a FinalPartitioned, which means it will not use the Final
// mode in Aggregate (which is slower)
assert!(formatted.contains("FinalPartitioned"));
// Make sure the plan uses hash-partitioned input for the final aggregate.
assert!(
formatted.contains("FinalPartitioned")
|| formatted.contains("HashPartitioned")
);

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,25 @@ fn aggregate_exec_with_alias(
let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr);

Arc::new(
AggregateExec::try_new(
AggregateMode::FinalPartitioned,
AggregateExec::try_new_with_require_single_output_partition(
AggregateMode::Final,
final_grouping,
vec![],
vec![],
Arc::new(
AggregateExec::try_new(
AggregateExec::try_new_with_require_single_output_partition(
AggregateMode::Partial,
group_by,
vec![],
vec![],
input,
schema.clone(),
false,
)
.unwrap(),
),
schema,
false,
)
.unwrap(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ mod test {
.await?;

let agg_final = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
AggregateMode::Final,
group_by.clone(),
aggr_expr.clone(),
vec![None],
Expand Down Expand Up @@ -954,7 +954,7 @@ mod test {
)?);

let agg_final = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
AggregateMode::Final,
group_by.clone(),
aggr_expr.clone(),
vec![None],
Expand Down
15 changes: 4 additions & 11 deletions datafusion/physical-optimizer/src/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
return Ok(Transformed::no(plan));
};

if !matches!(
agg_exec.mode(),
AggregateMode::Final | AggregateMode::FinalPartitioned
) {
if !matches!(agg_exec.mode(), AggregateMode::Final) {
return Ok(Transformed::no(plan));
}

Expand All @@ -85,13 +82,9 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_agg_exec.filter_expr(),
),
) {
let mode = if agg_exec.mode() == &AggregateMode::Final {
AggregateMode::Single
} else {
AggregateMode::SinglePartitioned
};
AggregateExec::try_new(
mode,
AggregateExec::try_new_with_settings_from(
agg_exec,
AggregateMode::Single,
input_agg_exec.group_expr().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
Expand Down
15 changes: 11 additions & 4 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,12 @@ pub fn adjust_input_keys_ordering(
.map(Transformed::yes);
} else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
if !requirements.data.is_empty() {
if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
if aggregate_exec.mode() == &AggregateMode::Final
&& matches!(
aggregate_exec.required_input_distribution().first(),
Some(Distribution::HashPartitioned(_))
)
{
return reorder_aggregate_keys(requirements, aggregate_exec)
.map(Transformed::yes);
} else {
Expand Down Expand Up @@ -504,7 +509,8 @@ pub fn reorder_aggregate_keys(
.into_iter()
.map(|idx| group_exprs[idx].clone())
.collect();
let partial_agg = Arc::new(AggregateExec::try_new(
let partial_agg = Arc::new(AggregateExec::try_new_with_settings_from(
agg_exec,
AggregateMode::Partial,
PhysicalGroupBy::new_single(new_group_exprs),
agg_exec.aggr_expr().to_vec(),
Expand All @@ -523,8 +529,9 @@ pub fn reorder_aggregate_keys(
.map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
.collect(),
);
let new_final_agg = Arc::new(AggregateExec::try_new(
AggregateMode::FinalPartitioned,
let new_final_agg = Arc::new(AggregateExec::try_new_with_settings_from(
agg_exec,
AggregateMode::Final,
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
Expand Down
54 changes: 52 additions & 2 deletions datafusion/physical-optimizer/src/limited_distinct_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

use std::sync::Arc;

use datafusion_physical_plan::aggregates::{AggregateExec, LimitOptions};
use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, LimitOptions};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

Expand Down Expand Up @@ -54,7 +55,8 @@ impl LimitedDistinctAggregation {
}

// We found what we want: clone, copy the limit down, and return modified node
let new_aggr = AggregateExec::try_new(
let new_aggr = AggregateExec::try_new_with_settings_from(
aggr,
*aggr.mode(),
aggr.group_expr().clone(),
aggr.aggr_expr().to_vec(),
Expand All @@ -64,6 +66,54 @@ impl LimitedDistinctAggregation {
)
.expect("Unable to copy Aggregate!")
.with_limit_options(Some(LimitOptions::new(limit)));

if matches!(aggr.mode(), AggregateMode::Final)
&& let (child_plan, wrap_coalesce) = if let Some(coalesce) = aggr
.input()
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
(Arc::clone(coalesce.input()), true)
} else {
(Arc::clone(aggr.input()), false)
}
&& let Some(child_agg) = child_plan.as_any().downcast_ref::<AggregateExec>()
&& matches!(child_agg.mode(), AggregateMode::Partial)
&& !child_agg.group_expr().has_grouping_set()
{
let new_child = AggregateExec::try_new_with_settings_from(
child_agg,
AggregateMode::Partial,
child_agg.group_expr().clone(),
child_agg.aggr_expr().to_vec(),
child_agg.filter_expr().to_vec(),
child_agg.input().to_owned(),
child_agg.input_schema(),
)
.expect("Unable to copy Aggregate!")
.with_limit_options(Some(LimitOptions::new(limit)));

let new_input: Arc<dyn ExecutionPlan> = if wrap_coalesce {
Arc::new(CoalescePartitionsExec::new(Arc::new(new_child)))
} else {
Arc::new(new_child)
};

let rebuilt_final = AggregateExec::try_new_with_settings_from(
&new_aggr,
AggregateMode::Final,
new_aggr.group_expr().clone(),
new_aggr.aggr_expr().to_vec(),
new_aggr.filter_expr().to_vec(),
new_input,
new_aggr.input_schema(),
)
.expect("Unable to copy Aggregate!")
.with_limit_options(Some(LimitOptions::new(limit)));

return Some(Arc::new(rebuilt_final));
}

Some(Arc::new(new_aggr))
}

Expand Down
Loading