From f2b1e757d734c91b8579cc4f7a5d4c1935c8fbe9 Mon Sep 17 00:00:00 2001 From: Yu-Chuan Hung Date: Fri, 15 May 2026 00:34:01 +0800 Subject: [PATCH 1/2] fix: propagate inner-field metadata through make_array and array_agg - Add `nullable_inner_field_from` / `nullable_list_item_field_from` helpers in `datafusion-common::utils` (built on `FieldExt::renamed`). - Extend `SingleRowListArrayBuilder::with_field` to also propagate metadata. - `make_array`: add `return_field_from_args`; thread inner `FieldRef` through new `array_array_with_field` runtime variant. - `array_agg`: add `return_field` and `state_fields` overrides; all four accumulators (`ArrayAgg`, `Distinct`, `OrderSensitive`, `Groups`) now carry `FieldRef` instead of `DataType`, propagating metadata. - Add SLT `array_metadata_propagation.slt` covering `make_array` and `array_agg`. - Update memory-accounting tests for new struct layout. --- datafusion/common/src/utils/mod.rs | 60 +++- .../functions-aggregate/benches/array_agg.rs | 5 +- .../functions-aggregate/src/array_agg.rs | 263 +++++++++++------- datafusion/functions-nested/src/make_array.rs | 81 +++++- .../spark/src/function/aggregate/collect.rs | 4 +- .../test_files/array_metadata_propagation.slt | 50 ++++ 6 files changed, 337 insertions(+), 126 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/array_metadata_propagation.slt diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index acee7b7a84b02..6c1b7949121ba 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -24,6 +24,7 @@ pub mod proxy; pub mod string_utils; use crate::assert_or_internal_err; +use crate::datatype::FieldExt; use crate::error::{_exec_datafusion_err, _exec_err, _internal_datafusion_err}; use crate::{Result, ScalarValue}; use arrow::array::{ @@ -40,13 +41,13 @@ use arrow::compute::kernels::cmp::neq; use arrow::compute::kernels::length::length; use arrow::compute::{SortColumn, SortOptions, partition}; use arrow::datatypes::{ - ArrowNativeType, DataType, Field, Int32Type, Int64Type, SchemaRef, + ArrowNativeType, DataType, Field, FieldRef, Int32Type, Int64Type, SchemaRef, }; #[cfg(feature = "sql")] use sqlparser::{ast::Ident, dialect::GenericDialect, parser::Parser}; use std::borrow::{Borrow, Cow}; use std::cmp::{Ordering, min}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::iter::repeat_n; use std::num::NonZero; use std::ops::Range; @@ -425,6 +426,8 @@ pub struct SingleRowListArrayBuilder { /// Specify the field name for the resulting array. Defaults to value used in /// [`Field::new_list_field`] field_name: Option, + /// Optional Arrow field metadata to attach to the resulting list's inner field. + field_metadata: Option>, } impl SingleRowListArrayBuilder { @@ -434,6 +437,7 @@ impl SingleRowListArrayBuilder { arr, nullable: true, field_name: None, + field_metadata: None, } } @@ -449,10 +453,17 @@ impl SingleRowListArrayBuilder { self } - /// Copies field name and nullable from the specified field - pub fn with_field(self, field: &Field) -> Self { - self.with_field_name(Some(field.name().to_owned())) - .with_nullable(field.is_nullable()) + /// Copies field name, nullable, and metadata from the specified field. + /// + /// Propagating metadata is required for Arrow extension types + /// (`ARROW:extension:name` / `ARROW:extension:metadata`) to round-trip + /// through SQL list constructors (e.g. `make_array`, `array_agg`). + pub fn with_field(mut self, field: &Field) -> Self { + self.field_name = Some(field.name().to_owned()); + self.nullable = field.is_nullable(); + let metadata = field.metadata(); + self.field_metadata = (!metadata.is_empty()).then(|| metadata.clone()); + self } /// Build a single element [`ListArray`] @@ -524,16 +535,51 @@ impl SingleRowListArrayBuilder { arr, nullable, field_name, + field_metadata, } = self; let data_type = arr.data_type().to_owned(); - let field = match field_name { + let mut field = match field_name { Some(name) => Field::new(name, data_type, nullable), None => Field::new_list_field(data_type, nullable), }; + if let Some(metadata) = field_metadata { + field = field.with_metadata(metadata); + } (Arc::new(field), arr) } } +/// Return `inner` renamed to `name` with nullable forced to `true`, preserving +/// its data type and metadata. +/// +/// This is the canonical shape for the inner field of a composite SQL output +/// (the "item" field of a list, a `cN` member of a struct, the `key`/`value` +/// of a map, …). Preserving metadata is what lets Arrow extension types +/// (`ARROW:extension:name` / `ARROW:extension:metadata`) round-trip through +/// SQL list/struct/map constructors. +/// +/// Takes `inner` by value so the caller controls cloning; combined with +/// [`FieldExt::renamed`], the field is mutated in place when uniquely owned +/// and short-circuits entirely when both name and nullability already match. +/// At most one deep clone is performed. +/// +/// Differs from [`FieldExt::renamed`] in that this function also forces +/// nullability (the trait method preserves it). +pub fn nullable_inner_field_from(inner: FieldRef, name: &str) -> FieldRef { + let mut inner = inner.renamed(name); + if !inner.is_nullable() { + Arc::make_mut(&mut inner).set_nullable(true); + } + inner +} + +/// Build the canonical inner field of a list-shaped output (`List`/ +/// `LargeList`/`FixedSizeList`/`ListView`) from `inner`. Sugar for +/// [`nullable_inner_field_from`] with [`Field::LIST_FIELD_DEFAULT_NAME`]. +pub fn nullable_list_item_field_from(inner: FieldRef) -> FieldRef { + nullable_inner_field_from(inner, Field::LIST_FIELD_DEFAULT_NAME) +} + /// Wrap arrays into a single element `ListArray`. /// /// Example: diff --git a/datafusion/functions-aggregate/benches/array_agg.rs b/datafusion/functions-aggregate/benches/array_agg.rs index b0d8148c3ea65..1a062f5b27e76 100644 --- a/datafusion/functions-aggregate/benches/array_agg.rs +++ b/datafusion/functions-aggregate/benches/array_agg.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayRef, ArrowPrimitiveType, AsArray, ListArray, NullBufferBuilder, }; -use arrow::datatypes::{Field, Int64Type}; +use arrow::datatypes::{Field, FieldRef, Int64Type}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::Accumulator; use datafusion_functions_aggregate::array_agg::ArrayAggAccumulator; @@ -41,11 +41,12 @@ pub fn seedable_rng() -> StdRng { #[expect(clippy::needless_pass_by_value)] fn merge_batch_bench(c: &mut Criterion, name: &str, values: ArrayRef) { let list_item_data_type = values.as_list::().values().data_type().clone(); + let list_item_field: FieldRef = Arc::new(Field::new("v", list_item_data_type, true)); c.bench_function(name, |b| { b.iter(|| { #[expect(clippy::unit_arg)] black_box( - ArrayAggAccumulator::try_new(&list_item_data_type, false) + ArrayAggAccumulator::try_new(&list_item_field, false) .unwrap() .merge_batch(std::slice::from_ref(&values)) .unwrap(), diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 24edaaff1f09d..43eaf8f7bd98e 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -17,22 +17,22 @@ //! `ARRAY_AGG` aggregate implementation: [`ArrayAgg`] -use std::cmp::Ordering; -use std::collections::{HashSet, VecDeque}; -use std::mem::{size_of, size_of_val, take}; -use std::sync::Arc; - use arrow::array::{ - Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, StructArray, - UInt32Array, new_empty_array, + Array, ArrayData, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, + StructArray, UInt32Array, new_empty_array, }; use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow::compute::{SortOptions, filter}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; +use std::cmp::Ordering; +use std::collections::{HashSet, VecDeque}; +use std::mem::{size_of, size_of_val, take}; +use std::sync::Arc; use datafusion_common::cast::as_list_array; use datafusion_common::utils::{ - SingleRowListArrayBuilder, compare_rows, get_row_at_idx, take_function_args, + SingleRowListArrayBuilder, compare_rows, get_row_at_idx, + nullable_list_item_field_from, take_function_args, }; use datafusion_common::{Result, ScalarValue, assert_eq_or_internal_err, exec_err}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -111,13 +111,27 @@ impl AggregateUDFImpl for ArrayAgg { )))) } + fn return_field(&self, arg_fields: &[FieldRef]) -> Result { + // Preserve the input field's metadata on the list's inner field so + // Arrow extension types (`ARROW::extension::*`) round-trip through + // `array_agg`. + let inner = nullable_list_item_field_from(Arc::clone(&arg_fields[0])); + Ok(Arc::new(Field::new( + self.name(), + DataType::List(inner), + self.is_nullable(), + ))) + } + fn state_fields(&self, args: StateFieldsArgs) -> Result> { + // See COMMENTS.md to understand why nullable is set to true. + // Preserve the input field's metadata on the list's inner field. + let inner = nullable_list_item_field_from(Arc::clone(&args.input_fields[0])); if args.is_distinct { return Ok(vec![ - Field::new_list( + Field::new( format_state_name(args.name, "distinct_array_agg"), - // See COMMENTS.md to understand why nullable is set to true - Field::new_list_field(args.input_fields[0].data_type().clone(), true), + DataType::List(inner), true, ) .into(), @@ -125,10 +139,9 @@ impl AggregateUDFImpl for ArrayAgg { } let mut fields = vec![ - Field::new_list( + Field::new( format_state_name(args.name, "array_agg"), - // See COMMENTS.md to understand why nullable is set to true - Field::new_list_field(args.input_fields[0].data_type().clone(), true), + DataType::List(inner), true, ) .into(), @@ -167,7 +180,6 @@ impl AggregateUDFImpl for ArrayAgg { fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { let field = &acc_args.expr_fields[0]; - let data_type = field.data_type(); let ignore_nulls = acc_args.ignore_nulls && field.is_nullable(); if acc_args.is_distinct { @@ -195,17 +207,14 @@ impl AggregateUDFImpl for ArrayAgg { } }; return Ok(Box::new(DistinctArrayAggAccumulator::try_new( - data_type, + field, sort_option, ignore_nulls, )?)); } let Some(ordering) = LexOrdering::new(acc_args.order_bys.to_vec()) else { - return Ok(Box::new(ArrayAggAccumulator::try_new( - data_type, - ignore_nulls, - )?)); + return Ok(Box::new(ArrayAggAccumulator::try_new(field, ignore_nulls)?)); }; let ordering_dtypes = ordering @@ -214,7 +223,7 @@ impl AggregateUDFImpl for ArrayAgg { .collect::>>()?; OrderSensitiveArrayAggAccumulator::try_new( - data_type, + field, &ordering_dtypes, ordering, self.is_input_pre_ordered, @@ -237,10 +246,9 @@ impl AggregateUDFImpl for ArrayAgg { args: AccumulatorArgs, ) -> Result> { let field = &args.expr_fields[0]; - let data_type = field.data_type().clone(); let ignore_nulls = args.ignore_nulls && field.is_nullable(); Ok(Box::new(ArrayAggGroupsAccumulator::new( - data_type, + field, ignore_nulls, ))) } @@ -257,7 +265,9 @@ impl AggregateUDFImpl for ArrayAgg { #[derive(Debug)] pub struct ArrayAggAccumulator { values: VecDeque, - datatype: DataType, + /// Source field for the aggregated values. Carries data type and metadata + /// so the resulting `List`'s inner field preserves the input's metadata. + value_field: FieldRef, ignore_nulls: bool, /// Number of elements already consumed (retracted) from the front array. /// Used by sliding window frames to avoid copying on partial retract. @@ -265,11 +275,13 @@ pub struct ArrayAggAccumulator { } impl ArrayAggAccumulator { - /// new array_agg accumulator based on given item data type - pub fn try_new(datatype: &DataType, ignore_nulls: bool) -> Result { + /// New `array_agg` accumulator based on the given item field. Uses + /// `value_field`'s data type and metadata when constructing the output + /// list. + pub fn try_new(value_field: &FieldRef, ignore_nulls: bool) -> Result { Ok(Self { values: VecDeque::new(), - datatype: datatype.clone(), + value_field: Arc::clone(value_field), ignore_nulls, front_offset: 0, }) @@ -399,7 +411,7 @@ impl Accumulator for ArrayAggAccumulator { fn evaluate(&mut self) -> Result { if self.values.is_empty() { - return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); + return Ok(empty_list_scalar(&self.value_field)); } let element_arrays: Vec = self @@ -419,12 +431,15 @@ impl Accumulator for ArrayAggAccumulator { element_arrays.iter().map(|a| a.as_ref()).collect(); if element_refs.iter().all(|a| a.is_empty()) { - return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); + return Ok(empty_list_scalar(&self.value_field)); } let concated_array = arrow::compute::concat(&element_refs)?; - Ok(SingleRowListArrayBuilder::new(concated_array).build_list_scalar()) + let inner = nullable_list_item_field_from(Arc::clone(&self.value_field)); + Ok(SingleRowListArrayBuilder::new(concated_array) + .with_field(&inner) + .build_list_scalar()) } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { @@ -482,14 +497,16 @@ impl Accumulator for ArrayAggAccumulator { // accumulator might not own any data. .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) .sum::() - + self.datatype.size() - - size_of_val(&self.datatype) + + self.value_field.data_type().size() + - size_of_val(self.value_field.data_type()) } } #[derive(Debug)] struct ArrayAggGroupsAccumulator { - datatype: DataType, + /// Source field for the aggregated values. Carries data type and metadata + /// so the resulting `List`'s inner field preserves the input's metadata. + value_field: FieldRef, ignore_nulls: bool, /// Source arrays — input arrays (from update_batch) or list backing /// arrays (from merge_batch). @@ -501,9 +518,9 @@ struct ArrayAggGroupsAccumulator { } impl ArrayAggGroupsAccumulator { - fn new(datatype: DataType, ignore_nulls: bool) -> Self { + fn new(value_field: &FieldRef, ignore_nulls: bool) -> Self { Self { - datatype, + value_field: Arc::clone(value_field), ignore_nulls, batches: Vec::new(), batch_entries: Vec::new(), @@ -690,7 +707,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { // Step 3: Scatter entries into group order using the counting sort. The // batch index is implicit from the outer loop position. let flat_values = if total_rows == 0 { - new_empty_array(&self.datatype) + new_empty_array(self.value_field.data_type()) } else { let mut interleave_indices = vec![(0usize, 0usize); total_rows]; for (batch_idx, entries) in self.batch_entries.iter().enumerate() { @@ -716,7 +733,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { } let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets)); - let field = Arc::new(Field::new_list_field(self.datatype.clone(), true)); + let field = nullable_list_item_field_from(Arc::clone(&self.value_field)); let result = ListArray::new(field, offsets, flat_values, nulls_builder.finish()); Ok(Arc::new(result)) @@ -787,7 +804,7 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { filter_nulls }; - let field = Arc::new(Field::new_list_field(self.datatype.clone(), true)); + let field = nullable_list_item_field_from(Arc::clone(&self.value_field)); let list_array = ListArray::new(field, offsets, Arc::clone(input), nulls); Ok(vec![Arc::new(list_array)]) @@ -815,20 +832,22 @@ impl GroupsAccumulator for ArrayAggGroupsAccumulator { #[derive(Debug)] pub struct DistinctArrayAggAccumulator { values: HashSet, - datatype: DataType, + /// Source field for the aggregated values. Carries data type and metadata + /// so the resulting `List`'s inner field preserves the input's metadata. + value_field: FieldRef, sort_options: Option, ignore_nulls: bool, } impl DistinctArrayAggAccumulator { pub fn try_new( - datatype: &DataType, + value_field: &FieldRef, sort_options: Option, ignore_nulls: bool, ) -> Result { Ok(Self { values: HashSet::new(), - datatype: datatype.clone(), + value_field: Arc::clone(value_field), sort_options, ignore_nulls, }) @@ -882,7 +901,7 @@ impl Accumulator for DistinctArrayAggAccumulator { fn evaluate(&mut self) -> Result { let mut values: Vec = self.values.iter().cloned().collect(); if values.is_empty() { - return Ok(ScalarValue::new_null_list(self.datatype.clone(), true, 1)); + return Ok(empty_list_scalar(&self.value_field)); } if let Some(opts) = self.sort_options { @@ -912,15 +931,14 @@ impl Accumulator for DistinctArrayAggAccumulator { delayed_cmp_err?; }; - let arr = ScalarValue::new_list(&values, &self.datatype, true); - Ok(ScalarValue::List(arr)) + values_to_list_scalar(&values, &self.value_field, true) } fn size(&self) -> usize { size_of_val(self) + ScalarValue::size_of_hashset(&self.values) - size_of_val(&self.values) - + self.datatype.size() - - size_of_val(&self.datatype) + + self.value_field.data_type().size() + - size_of_val(self.value_field.data_type()) - size_of_val(&self.sort_options) + size_of::>() } @@ -938,9 +956,12 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator { /// different partitions. For detailed information how merging is done, see /// [`merge_ordered_arrays`]. ordering_values: Vec>, - /// Stores datatypes of expressions inside values and ordering requirement + /// Source field for the aggregated values. Carries data type and metadata + /// so the resulting `List`'s inner field preserves the input's metadata. + value_field: FieldRef, + /// Stores datatypes of expressions inside ordering requirement /// expressions. - datatypes: Vec, + ordering_dtypes: Vec, /// Stores the ordering requirement of the `Accumulator`. ordering_req: LexOrdering, /// Whether the input is known to be pre-ordered @@ -953,21 +974,20 @@ pub(crate) struct OrderSensitiveArrayAggAccumulator { impl OrderSensitiveArrayAggAccumulator { /// Create a new order-sensitive ARRAY_AGG accumulator based on the given - /// item data type. + /// item field. pub fn try_new( - datatype: &DataType, + value_field: &FieldRef, ordering_dtypes: &[DataType], ordering_req: LexOrdering, is_input_pre_ordered: bool, reverse: bool, ignore_nulls: bool, ) -> Result { - let mut datatypes = vec![datatype.clone()]; - datatypes.extend(ordering_dtypes.iter().cloned()); Ok(Self { values: vec![], ordering_values: vec![], - datatypes, + value_field: Arc::clone(value_field), + ordering_dtypes: ordering_dtypes.to_vec(), ordering_req, is_input_pre_ordered, reverse, @@ -998,7 +1018,7 @@ impl OrderSensitiveArrayAggAccumulator { } fn evaluate_orderings(&self) -> Result { - let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]); + let fields = ordering_fields(&self.ordering_req, &self.ordering_dtypes); let column_wise_ordering_values = if self.ordering_values.is_empty() { fields @@ -1151,24 +1171,15 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { } if self.values.is_empty() { - return Ok(ScalarValue::new_null_list( - self.datatypes[0].clone(), - true, - 1, - )); + return Ok(empty_list_scalar(&self.value_field)); } - let values = self.values.clone(); - let array = if self.reverse { - ScalarValue::new_list_from_iter( - values.into_iter().rev(), - &self.datatypes[0], - true, - ) + let values: Vec = if self.reverse { + self.values.iter().rev().cloned().collect() } else { - ScalarValue::new_list_from_iter(values.into_iter(), &self.datatypes[0], true) + self.values.clone() }; - Ok(ScalarValue::List(array)) + values_to_list_scalar(&values, &self.value_field, true) } fn size(&self) -> usize { @@ -1181,9 +1192,11 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { total += ScalarValue::size_of_vec(row) - size_of_val(row); } - // Add size of the `self.datatypes` - total += size_of::() * self.datatypes.capacity(); - for dtype in &self.datatypes { + // Add size of the value field's data type and the ordering datatypes. + total += self.value_field.data_type().size() + - size_of_val(self.value_field.data_type()); + total += size_of::() * self.ordering_dtypes.capacity(); + for dtype in &self.ordering_dtypes { total += dtype.size() - size_of_val(dtype); } @@ -1194,6 +1207,39 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { } } +/// Build a single-element `ScalarValue::List` containing one null entry whose +/// inner field carries `value_field`'s data type and metadata. +/// +/// Used by the array_agg accumulators when they produce the all-NULL fallback +/// list, so Arrow extension-type metadata round-trips even when no rows were +/// aggregated. +fn empty_list_scalar(value_field: &FieldRef) -> ScalarValue { + let inner = nullable_list_item_field_from(Arc::clone(value_field)); + let data_type = DataType::List(inner); + ScalarValue::List(Arc::new(ListArray::from(ArrayData::new_null( + &data_type, 1, + )))) +} + +/// Build a single-element `ScalarValue::List` from a slice of `ScalarValue`s, +/// preserving `value_field`'s data type and metadata on the resulting list's +/// inner field. +fn values_to_list_scalar( + values: &[ScalarValue], + value_field: &FieldRef, + nullable: bool, +) -> Result { + let arr = if values.is_empty() { + new_empty_array(value_field.data_type()) + } else { + ScalarValue::iter_to_array(values.iter().cloned())? + }; + let inner = nullable_list_item_field_from(Arc::clone(value_field)); + Ok(SingleRowListArrayBuilder::new(arr) + .with_field(&inner) + .with_nullable(nullable) + .build_list_scalar()) +} #[cfg(test)] mod tests { use super::*; @@ -1204,6 +1250,12 @@ mod tests { use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::expressions::Column; + /// Build a nullable item field for use in accumulator constructors. The + /// name is arbitrary — accumulators rename it to "item" when emitting. + fn item_field(data_type: DataType) -> FieldRef { + Arc::new(Field::new("v", data_type, true)) + } + #[test] fn no_duplicates_no_distinct() -> Result<()> { let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; @@ -1471,7 +1523,7 @@ mod tests { acc2.update_batch(&[data(["b", "c", "a"])])?; acc1 = merge(acc1, acc2)?; - assert_eq!(acc1.size(), 282); + assert_eq!(acc1.size(), 266); Ok(()) } @@ -1489,7 +1541,7 @@ mod tests { acc1 = merge(acc1, acc2)?; // without compaction, the size is 16660 - assert_eq!(acc1.size(), 1660); + assert_eq!(acc1.size(), 1644); Ok(()) } @@ -1507,7 +1559,7 @@ mod tests { ])])?; // without compaction, the size is 17112 - assert_eq!(acc.size(), 2224); + assert_eq!(acc.size(), 2160); Ok(()) } @@ -1657,7 +1709,7 @@ mod tests { #[test] fn groups_accumulator_multiple_batches() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); // First batch let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); @@ -1676,7 +1728,7 @@ mod tests { #[test] fn groups_accumulator_emit_first() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); acc.update_batch(&[values], &[0, 1, 2], None, 3)?; @@ -1701,7 +1753,7 @@ mod tests { // both groups. After emitting group 0, batch 0 should be // dropped entirely and batch 1 should be compacted to the // retained row(s). - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20])); acc.update_batch(&[batch0], &[0, 0], None, 2)?; @@ -1739,7 +1791,7 @@ mod tests { #[test] fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40])); acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?; @@ -1767,7 +1819,7 @@ mod tests { #[test] fn groups_accumulator_emit_all_releases_capacity() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64)); acc.update_batch( @@ -1790,7 +1842,7 @@ mod tests { #[test] fn groups_accumulator_null_groups() -> Result<()> { // Groups that never receive values should produce null - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![1])); // Only group 0 gets a value, groups 1 and 2 are empty @@ -1804,7 +1856,7 @@ mod tests { #[test] fn groups_accumulator_ignore_nulls() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), true); let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None])); @@ -1821,7 +1873,7 @@ mod tests { #[test] fn groups_accumulator_opt_filter() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); // Use a mix of false and null to filter out rows — both should @@ -1840,12 +1892,14 @@ mod tests { fn groups_accumulator_state_merge_roundtrip() -> Result<()> { // Accumulator 1: update_batch, then merge, then update_batch again. // Verifies that values appear in chronological insertion order. - let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc1 = + ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); acc1.update_batch(&[values], &[0, 1], None, 2)?; // Accumulator 2 - let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc2 = + ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4])); acc2.update_batch(&[values], &[0, 1], None, 2)?; @@ -1869,7 +1923,7 @@ mod tests { #[test] fn groups_accumulator_convert_to_state() -> Result<()> { - let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None, Some(30)])); let state = acc.convert_to_state(&[values], None)?; @@ -1890,7 +1944,7 @@ mod tests { #[test] fn groups_accumulator_convert_to_state_with_filter() -> Result<()> { - let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); let filter = BooleanArray::from(vec![true, false, true]); @@ -1913,13 +1967,14 @@ mod tests { fn groups_accumulator_convert_to_state_merge_preserves_nulls() -> Result<()> { // Verifies that null values survive the convert_to_state -> merge_batch // round-trip when ignore_nulls is false (default null handling). - let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); let state = acc.convert_to_state(&[values], None)?; // Feed state into a new accumulator via merge_batch - let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc2 = + ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); acc2.merge_batch(&state, &[0, 0, 1], None, 2)?; // Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL] @@ -1935,7 +1990,7 @@ mod tests { fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> { // Verifies that null values are dropped in the convert_to_state -> // merge_batch round-trip when ignore_nulls is true. - let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true); + let acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), true); let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None])); @@ -1949,7 +2004,7 @@ mod tests { assert!(list.is_null(3)); // Feed state into a new accumulator via merge_batch - let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true); + let mut acc2 = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), true); acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?; // Group 0: received [1] and null (skipped) → [1] @@ -1963,7 +2018,7 @@ mod tests { #[test] fn groups_accumulator_all_groups_empty() -> Result<()> { - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), false); // Create groups but don't add any values (all filtered out) let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); @@ -1980,7 +2035,7 @@ mod tests { fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> { // When ignore_nulls is true and a group receives only nulls, // it should produce a null output - let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true); + let mut acc = ArrayAggGroupsAccumulator::new(&item_field(DataType::Int32), true); let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1), None])); acc.update_batch(&[values], &[0, 1, 0], None, 2)?; @@ -1996,7 +2051,7 @@ mod tests { #[test] fn retract_basic_sliding_window() -> Result<()> { - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; // Simulate ROWS BETWEEN 1 PRECEDING AND CURRENT ROW over [A, B, C, D] // Row 1: frame = [A] @@ -2022,7 +2077,7 @@ mod tests { #[test] fn retract_multi_element_across_arrays() -> Result<()> { - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; // First batch: 3 elements acc.update_batch(&[data(["A", "B", "C"])])?; @@ -2052,7 +2107,7 @@ mod tests { #[test] fn retract_with_nulls_preserved() -> Result<()> { // ignore_nulls = false: NULLs are stored and counted for retract - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; acc.update_batch(&[data([Some("A"), None, Some("C")])])?; assert_eq!( @@ -2071,7 +2126,7 @@ mod tests { fn retract_with_ignore_nulls() -> Result<()> { // ignore_nulls = true: NULLs are NOT stored by update_batch, // so retract must only count non-null values - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), true)?; // update_batch with [A, NULL, C] → stores only [A, C] (NULL filtered) acc.update_batch(&[data([Some("A"), None, Some("C")])])?; @@ -2096,7 +2151,7 @@ mod tests { #[test] fn retract_ignore_nulls_all_nulls_batch() -> Result<()> { // When ignore_nulls = true and retract batch is all NULLs, nothing is retracted - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), true)?; acc.update_batch(&[data([Some("A"), Some("B")])])?; assert_eq!(print_nulls(str_arr(acc.evaluate()?)?), vec!["A", "B"]); @@ -2110,7 +2165,7 @@ mod tests { #[test] fn retract_empty_accumulator() -> Result<()> { - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; // Retract on empty accumulator should be a no-op acc.retract_batch(&[data(["A"])])?; @@ -2133,7 +2188,7 @@ mod tests { // Row 3 (ts=3): no change (same frame [0..4)) // Row 4 (ts=4): retract [A] (ts=1 leaves, partial consume) // Row 5 (ts=100): retract [B,C,D] (3-element retract spanning arrays) - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; // Row 1: update_batch(["A","B","C"]) acc.update_batch(&[data(["A", "B", "C"])])?; @@ -2162,7 +2217,7 @@ mod tests { #[test] fn retract_update_after_full_drain() -> Result<()> { // Verify accumulator works correctly after being fully drained - let mut acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; acc.update_batch(&[data(["A", "B"])])?; acc.retract_batch(&[data(["A", "B"])])?; @@ -2186,10 +2241,10 @@ mod tests { #[test] fn retract_supports_retract_batch() -> Result<()> { - let acc = ArrayAggAccumulator::try_new(&DataType::Utf8, false)?; + let acc = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), false)?; assert!(acc.supports_retract_batch()); - let acc_ignore = ArrayAggAccumulator::try_new(&DataType::Utf8, true)?; + let acc_ignore = ArrayAggAccumulator::try_new(&item_field(DataType::Utf8), true)?; assert!(acc_ignore.supports_retract_batch()); Ok(()) @@ -2205,7 +2260,7 @@ mod tests { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(dict_type.clone()), true)?; // Dictionary values: ["hello", NULL, "world"] // Keys: [0, 1, 2, 1] — all valid, but keys 1 and 3 point to null value @@ -2257,7 +2312,7 @@ mod tests { let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); - let mut acc = ArrayAggAccumulator::try_new(&dict_type, true)?; + let mut acc = ArrayAggAccumulator::try_new(&item_field(dict_type.clone()), true)?; // update with ["A", "B", "C"] (no nulls) let values = StringArray::from(vec!["A", "B", "C"]); diff --git a/datafusion/functions-nested/src/make_array.rs b/datafusion/functions-nested/src/make_array.rs index 32af5df2c6019..068ca1fd89bbf 100644 --- a/datafusion/functions-nested/src/make_array.rs +++ b/datafusion/functions-nested/src/make_array.rs @@ -27,15 +27,17 @@ use arrow::array::{ }; use arrow::buffer::OffsetBuffer; use arrow::datatypes::DataType; -use arrow::datatypes::{DataType::Null, Field}; -use datafusion_common::utils::SingleRowListArrayBuilder; +use arrow::datatypes::{DataType::Null, Field, FieldRef}; +use datafusion_common::utils::{ + SingleRowListArrayBuilder, nullable_list_item_field_from, +}; use datafusion_common::{Result, plan_err}; use datafusion_expr::binary::{ try_type_union_resolution_with_struct, type_union_resolution, }; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, - Volatility, + ColumnarValue, Documentation, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, Volatility, }; use datafusion_macros::user_doc; use itertools::Itertools as _; @@ -105,8 +107,34 @@ impl ScalarUDFImpl for MakeArray { Ok(DataType::new_list(element_type, true)) } + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + // Pick the first non-Null argument's field as the source of element + // type and metadata; fall back to Null if all inputs are Null. + // Coercion has already unified element types, so any non-Null input + // is representative. + let inner = args + .arg_fields + .iter() + .find(|f| !f.data_type().is_null()) + .map(|f| nullable_list_item_field_from(Arc::clone(f))) + .unwrap_or_else(|| Arc::new(Field::new_list_field(Null, true))); + Ok(Arc::new(Field::new( + self.name(), + DataType::List(inner), + true, + ))) + } + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(make_array_inner)(&args.args) + let inner_field = match args.return_field.data_type() { + DataType::List(field) + | DataType::LargeList(field) + | DataType::FixedSizeList(field, _) => Some(Arc::clone(field)), + _ => None, + }; + make_scalar_function(move |arrays: &[ArrayRef]| { + make_array_inner_with_field(arrays, inner_field.clone()) + })(&args.args) } fn aliases(&self) -> &[String] { @@ -130,6 +158,16 @@ impl ScalarUDFImpl for MakeArray { /// Constructs an array using the input `data` as `ArrayRef`. /// Returns a reference-counted `Array` instance result. pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result { + make_array_inner_with_field(arrays, None) +} + +/// Like [`make_array_inner`] but uses `inner_field` for the resulting list's +/// inner field when supplied, so callers can propagate the input field's +/// metadata onto the constructed `ListArray`'s inner field. +pub(crate) fn make_array_inner_with_field( + arrays: &[ArrayRef], + inner_field: Option, +) -> Result { let data_type = arrays.iter().find_map(|arg| { let arg_type = arg.data_type(); (!arg_type.is_null()).then_some(arg_type) @@ -140,11 +178,20 @@ pub(crate) fn make_array_inner(arrays: &[ArrayRef]) -> Result { // Either an empty array or all nulls: let length = arrays.iter().map(|a| a.len()).sum(); let array = new_null_array(&Null, length); - Ok(Arc::new( - SingleRowListArrayBuilder::new(array).build_list_array(), - )) + let mut builder = SingleRowListArrayBuilder::new(array); + if let Some(field) = inner_field.as_ref() { + builder = builder.with_field(field); + } + Ok(Arc::new(builder.build_list_array())) } else { - array_array::(arrays, data_type.clone(), Field::LIST_FIELD_DEFAULT_NAME) + match inner_field { + Some(field) => array_array_with_field::(arrays, field), + None => array_array::( + arrays, + data_type.clone(), + Field::LIST_FIELD_DEFAULT_NAME, + ), + } } } @@ -192,17 +239,29 @@ pub fn array_array( args: &[ArrayRef], data_type: DataType, field_name: &str, +) -> Result { + array_array_with_field::(args, Arc::new(Field::new(field_name, data_type, true))) +} + +/// Same as [`array_array`] but takes a fully-formed inner [`FieldRef`] for the +/// resulting list. This lets callers propagate metadata (e.g. Arrow extension +/// type identifiers) onto the produced list's inner field. +pub fn array_array_with_field( + args: &[ArrayRef], + inner_field: FieldRef, ) -> Result { // do not accept 0 arguments. if args.is_empty() { return plan_err!("Array requires at least one argument"); } + let data_type = inner_field.data_type(); + let mut data = vec![]; let mut total_len = 0; for arg in args { let arg_data = if arg.as_any().is::() { - ArrayData::new_empty(&data_type) + ArrayData::new_empty(data_type) } else { arg.to_data() }; @@ -234,7 +293,7 @@ pub fn array_array( let data = mutable.freeze(); Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new(field_name, data_type, true)), + inner_field, OffsetBuffer::new(offsets.into()), arrow::array::make_array(data), None, diff --git a/datafusion/spark/src/function/aggregate/collect.rs b/datafusion/spark/src/function/aggregate/collect.rs index 5af0fd39cca07..1fa8089a0bc1e 100644 --- a/datafusion/spark/src/function/aggregate/collect.rs +++ b/datafusion/spark/src/function/aggregate/collect.rs @@ -85,7 +85,7 @@ impl AggregateUDFImpl for SparkCollectList { let data_type = field.data_type().clone(); let ignore_nulls = true; Ok(Box::new(NullToEmptyListAccumulator::new( - ArrayAggAccumulator::try_new(&data_type, ignore_nulls)?, + ArrayAggAccumulator::try_new(field, ignore_nulls)?, data_type, ))) } @@ -143,7 +143,7 @@ impl AggregateUDFImpl for SparkCollectSet { let data_type = field.data_type().clone(); let ignore_nulls = true; Ok(Box::new(NullToEmptyListAccumulator::new( - DistinctArrayAggAccumulator::try_new(&data_type, None, ignore_nulls)?, + DistinctArrayAggAccumulator::try_new(field, None, ignore_nulls)?, data_type, ))) } diff --git a/datafusion/sqllogictest/test_files/array_metadata_propagation.slt b/datafusion/sqllogictest/test_files/array_metadata_propagation.slt new file mode 100644 index 0000000000000..af4e5f02d540d --- /dev/null +++ b/datafusion/sqllogictest/test_files/array_metadata_propagation.slt @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Regression tests for https://github.com/apache/datafusion/issues/21982: +# UDFs/UDAFs that wrap an input column into a composite type (List, Struct, +# Map, ...) must propagate the input field's metadata onto the inner field +# of the output composite. The data type rendering used by `arrow_field` +# includes inner-field metadata, so we verify by string-matching the +# `data_type` projection. + +# make_array preserves inner field metadata +query T +SELECT arrow_field(make_array(with_metadata(1, 'k', 'v')))['data_type'] +---- +List(Int64, metadata: {"k": "v"}) + +# array_agg preserves inner field metadata +query T +SELECT arrow_field(array_agg(with_metadata(column1, 'k', 'v')))['data_type'] +FROM (VALUES (1), (2), (3)) +---- +List(Int64, metadata: {"k": "v"}) + +# array_agg DISTINCT preserves inner field metadata +query T +SELECT arrow_field(array_agg(DISTINCT with_metadata(column1, 'k', 'v')))['data_type'] +FROM (VALUES (1), (2), (1)) +---- +List(Int64, metadata: {"k": "v"}) + +# array_agg ORDER BY preserves inner field metadata +query T +SELECT arrow_field(array_agg(with_metadata(column1, 'k', 'v') ORDER BY column1))['data_type'] +FROM (VALUES (3), (1), (2)) +---- +List(Int64, metadata: {"k": "v"}) From b40b9f6b8bb864aa4dac5ba270ed4910470420f9 Mon Sep 17 00:00:00 2001 From: Yu-Chuan Hung Date: Fri, 15 May 2026 00:46:45 +0800 Subject: [PATCH 2/2] ci: retrigger flaky macos-aarch64 job