From 4eb8af243ebffb558a52ee5a71c2bcebdbc8e27b Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Fri, 30 Jan 2026 18:14:10 +0100 Subject: [PATCH 1/4] update return type --- datafusion/catalog/src/information_schema.rs | 45 ++++++++++++++----- .../functions/src/datetime/date_trunc.rs | 26 +++++------ 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 52bfeca3d4282..458ece5282b5f 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -24,7 +24,7 @@ use crate::{CatalogProviderList, SchemaProvider, TableProvider}; use arrow::array::builder::{BooleanBuilder, UInt8Builder}; use arrow::{ array::{StringBuilder, UInt64Builder}, - datatypes::{DataType, Field, Schema, SchemaRef}, + datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}, record_batch::RecordBatch, }; use async_trait::async_trait; @@ -34,7 +34,9 @@ use datafusion_common::error::Result; use datafusion_common::types::NativeType; use datafusion_execution::TaskContext; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF}; +use datafusion_expr::{ + AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF, +}; use datafusion_expr::{TableType, Volatility}; use datafusion_physical_plan::SendableRecordBatchStream; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; @@ -421,10 +423,24 @@ fn get_udf_args_and_return_types( Ok(arg_types .into_iter() .map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let arg_fields: Vec = arg_types + .iter() + .enumerate() + .map(|(i, t)| { + Arc::new(Field::new(format!("arg_{i}"), t.clone(), true)) + }) + .collect(); + let scalar_arguments = vec![None; arg_fields.len()]; let return_type = udf - .return_type(&arg_types) - .map(|t| remove_native_type_prefix(&NativeType::from(t))) + .return_field_from_args(ReturnFieldArgs { + arg_fields: &arg_fields, + scalar_arguments: &scalar_arguments, + }) + .map(|f| { + remove_native_type_prefix(&NativeType::from( + f.data_type().clone(), + )) + }) .ok(); let arg_types = arg_types .into_iter() @@ -447,11 +463,21 @@ fn get_udaf_args_and_return_types( Ok(arg_types .into_iter() .map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let arg_fields: Vec = arg_types + .iter() + .enumerate() + .map(|(i, t)| { + Arc::new(Field::new(format!("arg_{i}"), t.clone(), true)) + }) + .collect(); let return_type = udaf - .return_type(&arg_types) - .ok() - .map(|t| remove_native_type_prefix(&NativeType::from(t))); + .return_field(&arg_fields) + .map(|f| { + remove_native_type_prefix(&NativeType::from( + f.data_type().clone(), + )) + }) + .ok(); let arg_types = arg_types .into_iter() .map(|t| remove_native_type_prefix(&NativeType::from(t))) @@ -473,7 +499,6 @@ fn get_udwf_args_and_return_types( Ok(arg_types .into_iter() .map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method let arg_types = arg_types .into_iter() .map(|t| remove_native_type_prefix(&NativeType::from(t))) diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 951ce7e882936..6799f3d5a0cbb 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -38,7 +38,7 @@ use arrow::datatypes::{Field, FieldRef}; use datafusion_common::cast::as_primitive_array; use datafusion_common::types::{NativeType, logical_date, logical_string}; use datafusion_common::{ - DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, + DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err, }; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ @@ -223,27 +223,21 @@ impl ScalarUDFImpl for DateTruncFunc { &self.signature } - // keep return_type implementation for information schema generation - fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types[1].is_null() { - Ok(Timestamp(Nanosecond, None)) - } else { - Ok(arg_types[1].clone()) - } + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let data_types = args - .arg_fields - .iter() - .map(|f| f.data_type()) - .cloned() - .collect::>(); - let return_type = self.return_type(&data_types)?; + let field = &args.arg_fields[1]; + let return_type = if field.data_type().is_null() { + Timestamp(Nanosecond, None) + } else { + field.data_type().clone() + }; Ok(Arc::new(Field::new( self.name(), return_type, - args.arg_fields[1].is_nullable(), + field.is_nullable(), ))) } From 9d706cd7e8edaa506b176bb840a21f7d8de6006a Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Fri, 30 Jan 2026 18:55:11 +0100 Subject: [PATCH 2/4] update error message in return_type method for consistency --- datafusion/functions/src/datetime/date_trunc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 6799f3d5a0cbb..8497e583ba4bc 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -224,7 +224,7 @@ impl ScalarUDFImpl for DateTruncFunc { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - internal_err!("return_field_from_args should be used instead") + internal_err!("return_field_from_args should be called instead") } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { From e8d78e81765137be6fd7258e84cdcd62fdddd907 Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Fri, 30 Jan 2026 19:03:24 +0100 Subject: [PATCH 3/4] enhance get_udwf_args_and_return_types to include return type for window UDFs --- datafusion/catalog/src/information_schema.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog/src/information_schema.rs b/datafusion/catalog/src/information_schema.rs index 458ece5282b5f..ea93dc21a3f5b 100644 --- a/datafusion/catalog/src/information_schema.rs +++ b/datafusion/catalog/src/information_schema.rs @@ -34,6 +34,7 @@ use datafusion_common::error::Result; use datafusion_common::types::NativeType; use datafusion_execution::TaskContext; use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_expr::function::WindowUDFFieldArgs; use datafusion_expr::{ AggregateUDF, ReturnFieldArgs, ScalarUDF, Signature, TypeSignature, WindowUDF, }; @@ -499,11 +500,26 @@ fn get_udwf_args_and_return_types( Ok(arg_types .into_iter() .map(|arg_types| { + let arg_fields: Vec = arg_types + .iter() + .enumerate() + .map(|(i, t)| { + Arc::new(Field::new(format!("arg_{i}"), t.clone(), true)) + }) + .collect(); + let return_type = udwf + .field(WindowUDFFieldArgs::new(&arg_fields, udwf.name())) + .map(|f| { + remove_native_type_prefix(&NativeType::from( + f.data_type().clone(), + )) + }) + .ok(); let arg_types = arg_types .into_iter() .map(|t| remove_native_type_prefix(&NativeType::from(t))) .collect::>(); - (arg_types, None) + (arg_types, return_type) }) .collect::>()) } From 3461f9b8c3aa32eca3e0f4d7f8244ceef297c6e1 Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Sat, 31 Jan 2026 15:00:37 +0100 Subject: [PATCH 4/4] updated return in benches --- datafusion/functions/benches/date_trunc.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/functions/benches/date_trunc.rs b/datafusion/functions/benches/date_trunc.rs index f5c8ceb5fe9d5..664221f7ba2aa 100644 --- a/datafusion/functions/benches/date_trunc.rs +++ b/datafusion/functions/benches/date_trunc.rs @@ -25,7 +25,7 @@ use arrow::datatypes::Field; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; -use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_expr::{ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs}; use datafusion_functions::datetime::date_trunc; use rand::Rng; use rand::rngs::ThreadRng; @@ -57,10 +57,13 @@ fn criterion_benchmark(c: &mut Criterion) { }) .collect::>(); - let return_type = udf - .return_type(&args.iter().map(|arg| arg.data_type()).collect::>()) + let scalar_arguments = vec![None; arg_fields.len()]; + let return_field = udf + .return_field_from_args(ReturnFieldArgs { + arg_fields: &arg_fields, + scalar_arguments: &scalar_arguments, + }) .unwrap(); - let return_field = Arc::new(Field::new("f", return_type, true)); let config_options = Arc::new(ConfigOptions::default()); b.iter(|| {