Skip to content

DataFrame API: allow aggregate functions in select() (#17874)#21021

Open
cj-zhukov wants to merge 3 commits intoapache:mainfrom
cj-zhukov:cj-zhukov/DataFrame-API-allow-aggregate-functions-in-select
Open

DataFrame API: allow aggregate functions in select() (#17874)#21021
cj-zhukov wants to merge 3 commits intoapache:mainfrom
cj-zhukov:cj-zhukov/DataFrame-API-allow-aggregate-functions-in-select

Conversation

@cj-zhukov
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

DataFrame::select currently does not support aggregate expressions directly.
Users must explicitly call DataFrame::aggregate. This PR improves ergonomics by allowing aggregate expressions inside select, while preserving existing behavior and validation rules.

What changes are included in this PR?

  • Added support for using aggregate functions directly in DataFrame::select
  • Updated snapshot tests to reflect the new logical plan datafusion-cli/tests/snapshots/cli_explain_environment_overrides@explain_plan_environment_overrides.snap

Are these changes tested?

  • Existing tests continue to pass
  • Added new test for this new use case test_dataframe_api_aggregate_fn_in_select

Are there any user-facing changes?

Yes (behavioral improvement, no API changes).
Users can now use aggregate functions directly in select() without explicitly calling aggregate().

ctx.table("aggregate_test_100")
    .await?
    .select(vec![
        approx_distinct(col("c9")).alias("count_c9"),
        approx_distinct(cast(col("c9"), arrow_schema::DataType::Utf8View))
            .alias("count_c9_str"),
    ])?
    .show()
    .await?;

Previously, this required:

ctx.table("aggregate_test_100")
    .await?
    .aggregate(
        vec![],
        vec![
            approx_distinct(col("c9")).alias("count_c9"),
            approx_distinct(cast(col("c9"), arrow_schema::DataType::Utf8View))
                .alias("count_c9_str"),
        ],
    )?
    .show()
    .await?;

Both syntaxes are now valid and can be used interchangeably.

There are no breaking changes to the public API.

@github-actions github-actions bot added the core Core DataFusion crate label Mar 18, 2026
@martin-g
Copy link
Member

You need to update the Insta snapshot for the datafusion-cli crate

| | "Plan": { |
| | "Expressions": [ |
| | "Int64(123)" |
| | ], |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you modified the Insta snapshot!
But it is failing now.
So, maybe you have to revert the change ?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That helped - thank you!


let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
// Collect aggregate expressions
let aggr_exprs = find_aggregate_exprs(expressions.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find_aggregate_exprs() deduplicates the expressions.
Test like:

let res = df.select(vec![
        count(col("c9")).alias("count_c9"),
        count(col("c9")).alias("count_c9_str"),
    ])?;

fails with:


failures:

---- dataframe::test_dataframe_api_aggregate_fn_in_select2 stdout ----
Error: SchemaError(FieldNotFound { field: Column { relation: None, name: "__agg_1" }, valid_fields: [Column { relation: None, name: "__agg_0" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c1" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c2" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c3" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c4" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c5" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c6" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c7" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c8" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c9" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c10" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c11" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c12" }, Column { relation: Some(Bare { table: "aggregate_test_100" }), name: "c13" }] }, Some(""))

__agg_1 is lost

// Check if any expression is non-aggregate
let has_non_aggregate_expr = expressions
.clone()
.any(|expr| find_aggregate_exprs(std::iter::once(expr)).is_empty());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about aggregate expr + non-aggregate one ?
E.g.:

  let res = df.select(vec![
        count(col("c9")).alias("count_c9") + lit(1)
    ])?;

I'd expect 101 but it returns 100

);

Ok(())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for some more complex queries, e.g. df.select([sum(col("a")) + count(col("b"))]) and something with (qualified and non-qualified) wildcards too.

for (i, select_expr) in expr_list.into_iter().enumerate() {
match select_expr {
SelectExpr::Expression(expr) => {
let column = Expr::Column(Column::from_name(format!("__agg_{i}")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

__agg_0 could collide with a real column. Is this how it is being done elsewhere ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants