DataFrame API: allow aggregate functions in select() (#17874)#21021
DataFrame API: allow aggregate functions in select() (#17874)#21021cj-zhukov wants to merge 3 commits intoapache:mainfrom
Conversation
|
You need to update the Insta snapshot for the datafusion-cli crate |
| | | "Plan": { | | ||
| | | "Expressions": [ | | ||
| | | "Int64(123)" | | ||
| | | ], | |
There was a problem hiding this comment.
Hm, you modified the Insta snapshot!
But it is failing now.
So, maybe you have to revert the change ?!
There was a problem hiding this comment.
That helped - thank you!
|
|
||
| let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; | ||
| // Collect aggregate expressions | ||
| let aggr_exprs = find_aggregate_exprs(expressions.clone()); |
There was a problem hiding this comment.
find_aggregate_exprs() deduplicates the expressions.
Test like:
let res = df.select(vec![
count(col("c9")).alias("count_c9"),
count(col("c9")).alias("count_c9_str"),
])?;
fails with:
failures:
---- dataframe::test_dataframe_api_aggregate_fn_in_select2 stdout ----
Error: SchemaError(FieldNotFound { field: Column { relation: None, name: "__agg_1" }, valid_fields: [Column { relation: None, name: "__agg_0" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c1" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c2" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c3" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c4" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c5" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c6" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c7" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c8" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c9" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c10" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c11" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c12" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c13" }] }, Some(""))
__agg_1 is lost
| // Check if any expression is non-aggregate | ||
| let has_non_aggregate_expr = expressions | ||
| .clone() | ||
| .any(|expr| find_aggregate_exprs(std::iter::once(expr)).is_empty()); |
There was a problem hiding this comment.
What about aggregate expr + non-aggregate one ?
E.g.:
let res = df.select(vec![
count(col("c9")).alias("count_c9") + lit(1)
])?;I'd expect 101 but it returns 100
| ); | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Please add tests for some more complex queries, e.g. df.select([sum(col("a")) + count(col("b"))]) and something with (qualified and non-qualified) wildcards too.
| for (i, select_expr) in expr_list.into_iter().enumerate() { | ||
| match select_expr { | ||
| SelectExpr::Expression(expr) => { | ||
| let column = Expr::Column(Column::from_name(format!("__agg_{i}"))); |
There was a problem hiding this comment.
__agg_0 could collide with a real column. Is this how it is being done elsewhere ?
Which issue does this PR close?
select()#17874.Rationale for this change
DataFrame::selectcurrently does not support aggregate expressions directly.Users must explicitly call
DataFrame::aggregate. This PR improves ergonomics by allowing aggregate expressions insideselect, while preserving existing behavior and validation rules.What changes are included in this PR?
DataFrame::selectdatafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snapAre these changes tested?
test_dataframe_api_aggregate_fn_in_selectAre there any user-facing changes?
Yes (behavioral improvement, no API changes).
Users can now use aggregate functions directly in
select()without explicitly callingaggregate().Previously, this required:
Both syntaxes are now valid and can be used interchangeably.
There are no breaking changes to the public API.