diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 038eb96b7b45..9a5c6747362d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -43,7 +43,7 @@ use crate::joins::hash_join::stream::{ use crate::joins::join_hash_map::{JoinHashMapU32, JoinHashMapU64}; use crate::joins::utils::{ OnceAsync, OnceFut, asymmetric_join_output_partitioning, reorder_output_after_swap, - swap_join_projection, update_hash, + swap_join_projection, update_hash_with_values, }; use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; use crate::metrics::{Count, MetricBuilder}; @@ -65,8 +65,8 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; -use arrow::compute::concat_batches; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder}; +use arrow::compute::concat; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -103,24 +103,21 @@ pub(crate) const HASH_JOIN_SEED: SeededRandomState = const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; -#[expect(clippy::too_many_arguments)] fn try_create_array_map( bounds: &Option, - schema: &SchemaRef, batches: &[RecordBatch], - on_left: &[PhysicalExprRef], + values_by_batch: &[Vec], reservation: &mut MemoryReservation, perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, -) -> Result)>> { - if on_left.len() != 1 { +) -> Result> { + if values_by_batch.first().map_or(0, Vec::len) != 1 { return Ok(None); } if null_equality == NullEquality::NullEqualsNull { - for batch in batches.iter() { - let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + for arrays in values_by_batch { if arrays[0].null_count() > 0 { return Ok(None); } @@ -178,23 +175,52 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; - let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + let first_key_arrays: Vec<&dyn Array> = values_by_batch + .iter() + .map(|values| values[0].as_ref()) + .collect(); + let left_value = match first_key_arrays.as_slice() { + [] => return Ok(None), + [_] => Arc::clone(&values_by_batch[0][0]), + _ => concat(&first_key_arrays)?, + }; + + Ok(Some(ArrayMap::try_new(&left_value, min_val, max_val)?)) +} - let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; +fn concat_values_by_column(values_by_batch: &[Vec]) -> Result> { + let Some(first_batch) = values_by_batch.first() else { + return Ok(vec![]); + }; - Ok(Some((array_map, batch, left_values))) + (0..first_batch.len()) + .map(|value_index| { + let arrays: Vec<&dyn Array> = values_by_batch + .iter() + .map(|values| values[value_index].as_ref()) + .collect(); + if arrays.len() == 1 { + Ok(Arc::clone(&values_by_batch[0][value_index])) + } else { + Ok(concat(&arrays)?) + } + }) + .collect() } /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { - /// The hash table with indices into `batch` + /// The hash table with indices into the logical concatenation of `batches` /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown pub(super) map: Arc, - /// The input rows for the build side - batch: RecordBatch, - /// The build side on expressions values - values: Vec, + /// Schema for the build side input + schema: SchemaRef, + /// Build-side input batches in the same logical order as hash-map row indices + batches: Vec, + /// Inclusive start offset for each build-side batch, followed by the total row count + batch_offsets: Vec, + /// Per-build-batch values of the build-side join expressions + values: Vec>, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially @@ -225,16 +251,31 @@ impl JoinLeftData { &self.map } - /// returns a reference to the build side batch - pub(super) fn batch(&self) -> &RecordBatch { - &self.batch + /// returns the build side schema + pub(super) fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// returns a reference to the build side batches + pub(super) fn batches(&self) -> &[RecordBatch] { + &self.batches + } + + /// returns a reference to the build side batch offsets + pub(super) fn batch_offsets(&self) -> &[usize] { + &self.batch_offsets } /// returns a reference to the build side expressions values - pub(super) fn values(&self) -> &[ArrayRef] { + pub(super) fn values(&self) -> &[Vec] { &self.values } + /// returns an empty build-side batch with the correct schema + pub(super) fn empty_batch(&self) -> RecordBatch { + RecordBatch::new_empty(Arc::clone(&self.schema)) + } + /// returns a reference to the visited indices bitmap pub(super) fn visited_indices_bitmap(&self) -> &SharedBitmapBuilder { &self.visited_indices_bitmap @@ -1890,7 +1931,7 @@ fn should_collect_min_max_for_perfect_hash( /// before updating the filter exactly once. /// /// # Returns -/// `JoinLeftData` containing the hash map, consolidated batch, join key values, +/// `JoinLeftData` containing the hash map, logical build-side batches, join key values, /// visited indices bitmap, and computed bounds (if requested). #[expect(clippy::too_many_arguments)] async fn collect_left_input( @@ -1965,81 +2006,99 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( - &bounds, - &schema, - &batches, - &on_left, - &mut reservation, - config.execution.perfect_hash_join_small_build_threshold, - config.execution.perfect_hash_join_min_key_density, - null_equality, - )? { - array_map_created_count.add(1); - metrics.build_mem_used.add(array_map.size()); - - (Map::ArrayMap(array_map), batch, left_value) + // Evaluate key expressions per batch in the original collection order. + // The ArrayMap path uses forward order (matching how ArrayMap::fill_data + // iterates the concatenated key array), while the HashMap path reverses + // the batches so that FIFO chain ordering is preserved. + let mut batches = batches; + let mut values_by_batch = batches + .iter() + .map(|batch| evaluate_expressions_to_arrays(&on_left, batch)) + .collect::>>()?; + + let join_hash_map = if let Some(array_map) = try_create_array_map( + &bounds, + &batches, + &values_by_batch, + &mut reservation, + config.execution.perfect_hash_join_small_build_threshold, + config.execution.perfect_hash_join_min_key_density, + null_equality, + )? { + array_map_created_count.add(1); + metrics.build_mem_used.add(array_map.size()); + + Map::ArrayMap(array_map) + } else { + // Reverse batches and values for FIFO hash-map chain ordering + // (last collected batch gets the lowest offsets so that the + // chain head points to the earliest matching row). + batches.reverse(); + values_by_batch.reverse(); + + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) } else { - // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the - // `u64` indice variant - // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) - }; - - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - let batches_iter = batches.iter().rev(); - - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - )?; - offset += batch.num_rows(); - } + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + // Updating hashmap in the same logical order used by the stored build-side batches + for (batch, values) in batches.iter().zip(values_by_batch.iter()) { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash_with_values( + values, + batch.num_rows(), + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + )?; + offset += batch.num_rows(); + } - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + Map::HashMap(hashmap) + }; - (Map::HashMap(hashmap), batch, left_values) - }; + // Compute batch offsets from the final batches order (forward for ArrayMap, + // reversed for HashMap) so that locate_record_batch can map global row + // indices back to (batch_index, local_index). + let mut batch_offsets = Vec::with_capacity(batches.len() + 1); + batch_offsets.push(0); + for batch in &batches { + let next_offset = + batch_offsets.last().copied().unwrap_or_default() + batch.num_rows(); + batch_offsets.push(next_offset); + } // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { - let bitmap_size = bit_util::ceil(batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { @@ -2054,12 +2113,14 @@ async fn collect_left_input( // If the build side is small enough we can use IN list pushdown. // If it's too big we fall back to pushing down a reference to the hash table. // See `PushdownStrategy` for more details. - let estimated_size = left_values + let estimated_size = values_by_batch .iter() + .flatten() .map(|arr| arr.get_array_memory_size()) .sum::(); - if left_values.is_empty() - || left_values[0].is_empty() + if values_by_batch.is_empty() + || values_by_batch[0].is_empty() + || values_by_batch[0][0].is_empty() || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size || map.num_of_distinct_key() > config @@ -2067,7 +2128,9 @@ async fn collect_left_input( .hash_join_inlist_pushdown_max_distinct_values { PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { + } else if let Some(in_list_values) = + build_struct_inlist_values(&concat_values_by_column(&values_by_batch)?)? + { PushdownStrategy::InList(in_list_values) } else { PushdownStrategy::Map(Arc::clone(&map)) @@ -2080,8 +2143,10 @@ async fn collect_left_input( let data = JoinLeftData { map, - batch, - values: left_values, + schema, + batches, + batch_offsets, + values: values_by_batch, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), _reservation: reservation, @@ -2143,7 +2208,8 @@ mod tests { }; use arrow::array::{ - Date32Array, Int32Array, Int64Array, StructArray, UInt32Array, UInt64Array, + Date32Array, FixedSizeListArray, Int32Array, Int64Array, NullArray, StructArray, + UInt32Array, UInt64Array, }; use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field}; @@ -2234,6 +2300,31 @@ mod tests { TestMemoryExec::try_new_exec(&[vec![batch]], schema, None).unwrap() } + fn build_fixed_size_list_null_batch( + id_name: &str, + value_name: &str, + start: usize, + len: usize, + value_length: i32, + ) -> RecordBatch { + let value_field = Arc::new(Field::new("item", DataType::Null, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new(id_name, DataType::UInt32, false), + Field::new( + value_name, + DataType::FixedSizeList(Arc::clone(&value_field), value_length), + false, + ), + ])); + let ids = UInt32Array::from_iter_values((start as u32)..((start + len) as u32)); + let values = NullArray::new(len * value_length as usize); + let fixed_size_list = + FixedSizeListArray::new(value_field, value_length, Arc::new(values), None); + + RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(fixed_size_list)]) + .unwrap() + } + fn join( left: Arc, right: Arc, @@ -3065,6 +3156,372 @@ mod tests { assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } + #[tokio::test] + async fn join_full_fixed_size_list_high_build_index_with_fetch() -> Result<()> { + const VALUE_LENGTH: i32 = 1000; + const CHUNK_SIZE: usize = 1_000_000; + + let row_count = (u32::MAX as usize / VALUE_LENGTH as usize) + 1; + let match_id = (row_count - 1) as u32; + let task_ctx = prepare_task_ctx(1, false); + + let left_schema = Arc::new(Schema::new(vec![ + Field::new("id_left", DataType::UInt32, false), + Field::new( + "vec_left", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Null, true)), + VALUE_LENGTH, + ), + false, + ), + ])); + let mut left_batches = Vec::new(); + let mut start = 0; + while start < row_count { + let len = (row_count - start).min(CHUNK_SIZE); + left_batches.push(build_fixed_size_list_null_batch( + "id_left", + "vec_left", + start, + len, + VALUE_LENGTH, + )); + start += len; + } + let left = TestMemoryExec::try_new_exec( + &[left_batches], + Arc::clone(&left_schema), + None, + )?; + + let right_schema = Arc::new(Schema::new(vec![Field::new( + "id_right", + DataType::UInt32, + false, + )])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![Arc::new(UInt32Array::from(vec![match_id]))], + )?; + let right = + TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?; + + let on = vec![( + Arc::new(Column::new_with_schema("id_left", &left.schema())?) as _, + Arc::new(Column::new_with_schema("id_right", &right.schema())?) as _, + )]; + + let join = join( + left, + right, + on, + &JoinType::Full, + NullEquality::NullEqualsNothing, + )? + .builder() + .with_fetch(Some(1)) + .build()?; + + let batches = common::collect(join.execute(0, task_ctx)?).await?; + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + + let left_ids = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(left_ids.value(0), match_id); + assert!(!batches[0].column(1).is_null(0)); + let right_ids = batches[0] + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(right_ids.value(0), match_id); + + Ok(()) + } + + #[tokio::test] + async fn join_full_fixed_size_list_unmatched_rows() -> Result<()> { + const VALUE_LENGTH: i32 = 3; + + let task_ctx = prepare_task_ctx(1, false); + let left_schema = Arc::new(Schema::new(vec![ + Field::new("id_left", DataType::UInt32, false), + Field::new( + "vec_left", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Null, true)), + VALUE_LENGTH, + ), + false, + ), + ])); + let left = TestMemoryExec::try_new_exec( + &[vec![ + build_fixed_size_list_null_batch( + "id_left", + "vec_left", + 0, + 2, + VALUE_LENGTH, + ), + build_fixed_size_list_null_batch( + "id_left", + "vec_left", + 2, + 2, + VALUE_LENGTH, + ), + ]], + Arc::clone(&left_schema), + None, + )?; + + let right_schema = Arc::new(Schema::new(vec![Field::new( + "id_right", + DataType::UInt32, + false, + )])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![Arc::new(UInt32Array::from(vec![3]))], + )?; + let right = + TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?; + + let on = vec![( + Arc::new(Column::new_with_schema("id_left", &left.schema())?) as _, + Arc::new(Column::new_with_schema("id_right", &right.schema())?) as _, + )]; + + let join = join( + left, + right, + on, + &JoinType::Full, + NullEquality::NullEqualsNothing, + )?; + + let batches = common::collect(join.execute(0, task_ctx)?).await?; + let total_rows = batches.iter().map(RecordBatch::num_rows).sum::(); + assert_eq!(total_rows, 4); + + let mut left_ids = Vec::new(); + let mut matched_right_ids = Vec::new(); + let mut right_nulls = 0; + for batch in &batches { + let left_ids_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let right_ids_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..batch.num_rows() { + left_ids.push(left_ids_array.value(row)); + assert!(!batch.column(1).is_null(row)); + if right_ids_array.is_null(row) { + right_nulls += 1; + } else { + matched_right_ids.push(right_ids_array.value(row)); + } + } + } + + left_ids.sort_unstable(); + matched_right_ids.sort_unstable(); + assert_eq!(left_ids, vec![0, 1, 2, 3]); + assert_eq!(matched_right_ids, vec![3]); + assert_eq!(right_nulls, 3); + + Ok(()) + } + + /// Tests FULL join with a join filter on a schema that includes + /// FixedSizeList columns, exercising the + /// `apply_join_filter_to_indices_from_batches` path with multi-batch + /// build-side data. + #[tokio::test] + async fn join_full_fixed_size_list_with_filter() -> Result<()> { + const VALUE_LENGTH: i32 = 3; + + let task_ctx = prepare_task_ctx(1, false); + + // Build-side: two batches with (id, fsl, score) columns. + // Batch 0: ids 0,1 scores 10,20 + // Batch 1: ids 2,3 scores 30,5 + let fsl_field = Arc::new(Field::new("item", DataType::Null, true)); + let left_schema = Arc::new(Schema::new(vec![ + Field::new("id_left", DataType::UInt32, false), + Field::new( + "vec_left", + DataType::FixedSizeList(Arc::clone(&fsl_field), VALUE_LENGTH), + false, + ), + Field::new("score_left", DataType::Int32, false), + ])); + + fn build_left_batch( + schema: &Arc, + fsl_field: &Arc, + ids: Vec, + scores: Vec, + value_length: i32, + ) -> RecordBatch { + let len = ids.len(); + let fsl = FixedSizeListArray::new( + Arc::clone(fsl_field), + value_length, + Arc::new(NullArray::new(len * value_length as usize)), + None, + ); + RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(UInt32Array::from(ids)), + Arc::new(fsl), + Arc::new(Int32Array::from(scores)), + ], + ) + .unwrap() + } + + let left = TestMemoryExec::try_new_exec( + &[vec![ + build_left_batch( + &left_schema, + &fsl_field, + vec![0, 1], + vec![10, 20], + VALUE_LENGTH, + ), + build_left_batch( + &left_schema, + &fsl_field, + vec![2, 3], + vec![30, 5], + VALUE_LENGTH, + ), + ]], + Arc::clone(&left_schema), + None, + )?; + + // Probe-side: ids 1,2,3 with scores 15,15,15 + let right_schema = Arc::new(Schema::new(vec![ + Field::new("id_right", DataType::UInt32, false), + Field::new("score_right", DataType::Int32, false), + ])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![ + Arc::new(UInt32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![15, 15, 15])), + ], + )?; + let right = TestMemoryExec::try_new_exec( + &[vec![right_batch]], + Arc::clone(&right_schema), + None, + )?; + + let on = vec![( + Arc::new(Column::new_with_schema("id_left", &left.schema())?) as _, + Arc::new(Column::new_with_schema("id_right", &right.schema())?) as _, + )]; + + // Filter: score_left > score_right + // Match id=1: score 20 > 15 → kept + // Match id=2: score 30 > 15 → kept + // Match id=3: score 5 > 15 → filtered out + let filter = JoinFilter::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("score_left", 0)), + Operator::Gt, + Arc::new(Column::new("score_right", 1)), + )), + vec![ + ColumnIndex { + index: 2, + side: JoinSide::Left, + }, + ColumnIndex { + index: 1, + side: JoinSide::Right, + }, + ], + Arc::new(Schema::new(vec![ + Field::new("score_left", DataType::Int32, false), + Field::new("score_right", DataType::Int32, false), + ])), + ); + + let join = join_with_filter( + left, + right, + on, + filter, + &JoinType::Full, + NullEquality::NullEqualsNothing, + )?; + + let batches = common::collect(join.execute(0, task_ctx)?).await?; + let total_rows = batches.iter().map(RecordBatch::num_rows).sum::(); + + // Expected: 6 rows total + // id=0: unmatched left → (0, fsl, 10, NULL, NULL) + // id=1: matched, filter passes → (1, fsl, 20, 1, 15) + // id=2: matched, filter passes → (2, fsl, 30, 2, 15) + // id=3: unmatched left (filter) → (3, fsl, 5, NULL, NULL) + // id=3: unmatched right(filter) → (NULL, NULL, NULL, 3, 15) + // (no unmatched right for id=1,2 since filter passed) + assert_eq!(total_rows, 5); + + let mut matched_pairs = Vec::new(); + let mut left_only_ids = Vec::new(); + let mut right_only_ids = Vec::new(); + for batch in &batches { + let left_ids = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let right_ids = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + for row in 0..batch.num_rows() { + let left_null = left_ids.is_null(row); + let right_null = right_ids.is_null(row); + if !left_null && !right_null { + matched_pairs.push((left_ids.value(row), right_ids.value(row))); + } else if !left_null { + left_only_ids.push(left_ids.value(row)); + } else { + right_only_ids.push(right_ids.value(row)); + } + } + } + + matched_pairs.sort_unstable(); + left_only_ids.sort_unstable(); + right_only_ids.sort_unstable(); + + assert_eq!(matched_pairs, vec![(1, 1), (2, 2)]); + assert_eq!(left_only_ids, vec![0, 3]); + assert_eq!(right_only_ids, vec![3]); + + Ok(()) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_left_empty_right( @@ -4338,12 +4795,15 @@ mod tests { key_column.evaluate(&right)?.into_array(right.num_rows())?; let mut hashes_buffer = vec![0; right.num_rows()]; create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?; + let left_keys_values = vec![vec![left_keys_values]]; + let left_batch_offsets = vec![0, left.num_rows()]; let mut probe_indices_buffer = Vec::new(); let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &left_keys_values, + &left_batch_offsets, &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, @@ -4399,12 +4859,15 @@ mod tests { key_column.evaluate(&right)?.into_array(right.num_rows())?; let mut hashes_buffer = vec![0; right.num_rows()]; create_hashes([&right_keys_values], &random_state, &mut hashes_buffer)?; + let left_keys_values = vec![vec![left_keys_values]]; + let left_batch_offsets = vec![0, left.num_rows()]; let mut probe_indices_buffer = Vec::new(); let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &left_keys_values, + &left_batch_offsets, &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index ab630920184d..490522949829 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -33,15 +33,17 @@ use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; use crate::joins::utils::{ - OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, + OnceFut, equal_rows_arr_from_batches, get_final_indices_from_shared_bitmap, + value_is_null_from_batches, }; use crate::{ RecordBatchStream, SendableRecordBatchStream, handle_state, hash_utils::create_hashes, joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, - StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, - build_batch_empty_build_side, build_batch_from_indices, + StatefulStreamResult, adjust_indices_by_join_type, + apply_join_filter_to_indices_from_batches, build_batch_empty_build_side, + build_batch_from_indices, build_batch_from_indices_from_batches, need_produce_result_in_final, }, }; @@ -285,7 +287,8 @@ impl RecordBatchStream for HashJoinStream { #[expect(clippy::too_many_arguments)] pub(super) fn lookup_join_hashmap( build_hashmap: &dyn JoinHashMapType, - build_side_values: &[ArrayRef], + build_side_values: &[Vec], + build_side_batch_offsets: &[usize], probe_side_values: &[ArrayRef], null_equality: NullEquality, hashes_buffer: &[u64], @@ -309,10 +312,11 @@ pub(super) fn lookup_join_hashmap( // TODO: optimize equal_rows_arr to avoid allocation of intermediate arrays // https://github.com/apache/datafusion/issues/12131 - let (build_indices, probe_indices) = equal_rows_arr( + let (build_indices, probe_indices) = equal_rows_arr_from_batches( &build_indices_unfiltered, &probe_indices_unfiltered, build_side_values, + build_side_batch_offsets, probe_side_values, null_equality, )?; @@ -647,9 +651,10 @@ impl HashJoinStream { let is_empty = build_side.left_data.map().is_empty(); if is_empty && self.filter.is_none() { + let empty_build_batch = build_side.left_data.empty_batch(); let result = build_batch_empty_build_side( &self.schema, - build_side.left_data.batch(), + &empty_build_batch, &state.batch, &self.column_indices, self.join_type, @@ -667,6 +672,7 @@ impl HashJoinStream { Map::HashMap(map) => lookup_join_hashmap( map.as_ref(), build_side.left_data.values(), + build_side.left_data.batch_offsets(), &state.values, self.null_equality, &self.hashes_buffer, @@ -705,8 +711,10 @@ impl HashJoinStream { // apply join filter if exists let (left_indices, right_indices) = if let Some(filter) = &self.filter { - apply_join_filter_to_indices( - build_side.left_data.batch(), + apply_join_filter_to_indices_from_batches( + build_side.left_data.batches(), + build_side.left_data.schema(), + build_side.left_data.batch_offsets(), &state.batch, left_indices, right_indices, @@ -767,23 +775,32 @@ impl HashJoinStream { )?; // Build output batch and push to coalescer - let (build_batch, probe_batch, join_side) = - if self.join_type == JoinType::RightMark { - (&state.batch, build_side.left_data.batch(), JoinSide::Right) - } else { - (build_side.left_data.batch(), &state.batch, JoinSide::Left) - }; - - let batch = build_batch_from_indices( - &self.schema, - build_batch, - probe_batch, - &left_indices, - &right_indices, - &self.column_indices, - join_side, - self.join_type, - )?; + let batch = if self.join_type == JoinType::RightMark { + let empty_build_batch = build_side.left_data.empty_batch(); + build_batch_from_indices( + &self.schema, + &state.batch, + &empty_build_batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Right, + self.join_type, + )? + } else { + build_batch_from_indices_from_batches( + &self.schema, + build_side.left_data.batches(), + build_side.left_data.schema(), + build_side.left_data.batch_offsets(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + self.join_type, + )? + }; let push_status = self.output_buffer.push_batch(batch)?; @@ -859,21 +876,23 @@ impl HashJoinStream { .probe_side_non_empty .load(Ordering::Relaxed) { - // Since null_aware validation ensures single column join, we only check the first column - let build_key_column = &build_side.left_data.values()[0]; - // Filter out indices where the key is NULL - let filtered_indices: Vec = left_side - .iter() - .filter_map(|idx| { - let idx_usize = idx.unwrap() as usize; - if build_key_column.is_null(idx_usize) { - None // Skip rows with NULL keys - } else { - Some(idx.unwrap()) - } - }) - .collect(); + let mut filtered_indices = Vec::with_capacity(left_side.len()); + for idx in left_side.iter() { + let idx = idx.ok_or_else(|| { + internal_datafusion_err!( + "unexpected NULL build-side index for null-aware anti join" + ) + })?; + if !value_is_null_from_batches( + build_side.left_data.batch_offsets(), + build_side.left_data.values(), + 0, + idx as usize, + )? { + filtered_indices.push(idx); + } + } left_side = UInt64Array::from(filtered_indices); @@ -893,9 +912,11 @@ impl HashJoinStream { // Push final unmatched indices to output buffer if !left_side.is_empty() { let empty_right_batch = RecordBatch::new_empty(self.right.schema()); - let batch = build_batch_from_indices( + let batch = build_batch_from_indices_from_batches( &self.schema, - build_side.left_data.batch(), + build_side.left_data.batches(), + build_side.left_data.schema(), + build_side.left_data.batch_offsets(), &empty_right_batch, &left_side, &right_side, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 3130134e253d..fb2f71e23a76 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -42,18 +42,19 @@ pub use crate::joins::{JoinOn, JoinOnRef}; use arrow::array::{ Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array, - builder::UInt64Builder, downcast_array, new_null_array, + builder::UInt64Builder, downcast_array, new_empty_array, new_null_array, }; use arrow::array::{ ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, - Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array, - Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray, - StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, + Decimal128Array, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, + Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, + LargeStringArray, StringArray, StringViewArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + UInt8Array, UInt16Array, }; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::eq; -use arrow::compute::{self, FilterBuilder, and, take}; +use arrow::compute::{self, FilterBuilder, and, interleave, take}; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, }; @@ -65,7 +66,7 @@ use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; use datafusion_common::{ DataFusionError, JoinSide, JoinType, NullEquality, Result, SharedResult, - not_impl_err, plan_err, + internal_err, not_impl_err, plan_err, }; use datafusion_expr::Operator; use datafusion_expr::interval_arithmetic::Interval; @@ -981,6 +982,83 @@ pub(crate) fn apply_join_filter_to_indices( )) } +#[expect(clippy::too_many_arguments)] +pub(crate) fn apply_join_filter_to_indices_from_batches( + build_batches: &[RecordBatch], + build_schema: &Schema, + build_batch_offsets: &[usize], + probe_batch: &RecordBatch, + build_indices: UInt64Array, + probe_indices: UInt32Array, + filter: &JoinFilter, + build_side: JoinSide, + max_intermediate_size: Option, + join_type: JoinType, +) -> Result<(UInt64Array, UInt32Array)> { + if build_indices.is_empty() && probe_indices.is_empty() { + return Ok((build_indices, probe_indices)); + }; + + let filter_result = if let Some(max_size) = max_intermediate_size { + let mut filter_results = + Vec::with_capacity(build_indices.len().div_ceil(max_size)); + + for i in (0..build_indices.len()).step_by(max_size) { + let end = min(build_indices.len(), i + max_size); + let len = end - i; + let intermediate_batch = build_batch_from_indices_from_batches( + filter.schema(), + build_batches, + build_schema, + build_batch_offsets, + probe_batch, + &build_indices.slice(i, len), + &probe_indices.slice(i, len), + filter.column_indices(), + build_side, + join_type, + )?; + let filter_result = filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())?; + filter_results.push(filter_result); + } + + let filter_refs: Vec<&dyn Array> = + filter_results.iter().map(|a| a.as_ref()).collect(); + + compute::concat(&filter_refs)? + } else { + let intermediate_batch = build_batch_from_indices_from_batches( + filter.schema(), + build_batches, + build_schema, + build_batch_offsets, + probe_batch, + &build_indices, + &probe_indices, + filter.column_indices(), + build_side, + join_type, + )?; + + filter + .expression() + .evaluate(&intermediate_batch)? + .into_array(intermediate_batch.num_rows())? + }; + + let mask = as_boolean_array(&filter_result)?; + + let left_filtered = compute::filter(&build_indices, mask)?; + let right_filtered = compute::filter(&probe_indices, mask)?; + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) +} + /// Creates a [RecordBatch] with zero columns but the given row count. /// Used when a join has an empty projection (e.g. `SELECT count(1) ...`). fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result { @@ -1051,6 +1129,79 @@ pub(crate) fn build_batch_from_indices( Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) } +#[expect(clippy::too_many_arguments)] +pub(crate) fn build_batch_from_indices_from_batches( + schema: &Schema, + build_batches: &[RecordBatch], + build_schema: &Schema, + build_batch_offsets: &[usize], + probe_batch: &RecordBatch, + build_indices: &UInt64Array, + probe_indices: &UInt32Array, + column_indices: &[ColumnIndex], + build_side: JoinSide, + join_type: JoinType, +) -> Result { + if schema.fields().is_empty() { + let row_count = match join_type { + JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), + _ => build_indices.len(), + }; + return new_empty_schema_batch(schema, row_count); + } + + let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); + let mut prepared_build_indices = None; + + for column_index in column_indices { + let array = if column_index.side == JoinSide::None { + Arc::new(compute::is_not_null(probe_indices)?) + } else if column_index.side == build_side { + if build_batches.is_empty() + || build_indices.null_count() == build_indices.len() + { + assert_eq!(build_indices.null_count(), build_indices.len()); + new_null_array( + build_schema.field(column_index.index).data_type(), + build_indices.len(), + ) + } else { + let build_arrays = build_batches + .iter() + .map(|batch| Arc::clone(batch.column(column_index.index))) + .collect::>(); + if prepared_build_indices.is_none() { + prepared_build_indices = + Some(prepare_take_indices(build_batch_offsets, build_indices)?); + } + let prepared_build_indices = + prepared_build_indices.as_mut().ok_or_else(|| { + DataFusionError::Internal( + "missing prepared build indices".to_string(), + ) + })?; + take_array_from_batches_with_prepared_indices( + &build_arrays, + build_indices, + prepared_build_indices, + )? + } + } else { + let array = probe_batch.column(column_index.index); + if array.is_empty() || probe_indices.null_count() == probe_indices.len() { + assert_eq!(probe_indices.null_count(), probe_indices.len()); + new_null_array(array.data_type(), probe_indices.len()) + } else { + take(array.as_ref(), probe_indices, None)? + } + }; + + columns.push(array); + } + + Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?) +} + /// Returns a new [RecordBatch] resulting of a join where the build/left side is empty. /// The resulting batch has [Schema] `schema`. pub(crate) fn build_batch_empty_build_side( @@ -1743,11 +1894,34 @@ pub fn update_hash( // evaluate the keys let keys_values = evaluate_expressions_to_arrays(on, batch)?; + update_hash_with_values( + &keys_values, + batch.num_rows(), + hash_map, + offset, + random_state, + hashes_buffer, + deleted_offset, + fifo_hashmap, + ) +} + +#[expect(clippy::too_many_arguments)] +pub(crate) fn update_hash_with_values( + keys_values: &[ArrayRef], + num_rows: usize, + hash_map: &mut dyn JoinHashMapType, + offset: usize, + random_state: &RandomState, + hashes_buffer: &mut [u64], + deleted_offset: usize, + fifo_hashmap: bool, +) -> Result<()> { // calculate the hash values - let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?; + let hash_values = create_hashes(keys_values, random_state, hashes_buffer)?; // For usual JoinHashmap, the implementation is void. - hash_map.extend_zero(batch.num_rows()); + hash_map.extend_zero(num_rows); // Updating JoinHashMap from hash values iterator let hash_values_iter = hash_values @@ -1764,6 +1938,452 @@ pub fn update_hash( Ok(()) } +fn contains_fixed_size_list(data_type: &DataType) -> bool { + match data_type { + DataType::FixedSizeList(_, _) => true, + DataType::List(field) + | DataType::ListView(field) + | DataType::LargeList(field) + | DataType::LargeListView(field) + | DataType::Map(field, _) => contains_fixed_size_list(field.data_type()), + DataType::Struct(fields) => fields + .iter() + .any(|field| contains_fixed_size_list(field.data_type())), + DataType::Union(fields, _) => fields + .iter() + .any(|(_, field)| contains_fixed_size_list(field.data_type())), + DataType::Dictionary(_, value_type) => contains_fixed_size_list(value_type), + DataType::RunEndEncoded(_, values_field) => { + contains_fixed_size_list(values_field.data_type()) + } + _ => false, + } +} + +fn locate_record_batch( + batch_offsets: &[usize], + row_index: usize, +) -> Result<(usize, usize)> { + let total_rows = batch_offsets + .last() + .copied() + .ok_or_else(|| DataFusionError::Internal("missing batch offsets".to_string()))?; + if row_index >= total_rows { + return internal_err!( + "row index {row_index} is out of bounds for build side of size {total_rows}" + ); + } + + let upper_bound = batch_offsets.partition_point(|offset| *offset <= row_index); + let batch_index = upper_bound - 1; + Ok((batch_index, row_index - batch_offsets[batch_index])) +} + +#[derive(Debug)] +enum CachedMaxLocalIndex { + Unknown, + Known(Option), +} + +#[derive(Debug)] +struct SingleSourceTakeIndices { + total_rows: usize, + max_local_index: CachedMaxLocalIndex, +} + +#[derive(Debug)] +struct ResolvedSingleBatchTakeIndices { + batch_index: usize, + max_local_index: Option, + local_indices: Option, +} + +#[derive(Debug)] +struct ResolvedTakeIndices { + source_count: usize, + interleave_indices: Vec<(usize, usize)>, + single_batch: Option, +} + +#[derive(Debug)] +enum PreparedTakeIndices { + SingleSource(SingleSourceTakeIndices), + Resolved(ResolvedTakeIndices), +} + +fn max_index_in_bounds(indices: &UInt64Array, total_rows: usize) -> Result> { + let mut max_index: Option = None; + + for index in indices.iter().flatten() { + let index_usize = usize::try_from(index).map_err(|_| { + DataFusionError::Internal(format!( + "row index {index} is out of bounds for build side of size {total_rows}" + )) + })?; + if index_usize >= total_rows { + return internal_err!( + "row index {index} is out of bounds for build side of size {total_rows}" + ); + } + max_index = Some(match max_index { + Some(max_index) => max_index.max(index), + None => index, + }); + } + + Ok(max_index) +} + +fn prepare_take_indices( + batch_offsets: &[usize], + indices: &UInt64Array, +) -> Result { + let source_count = batch_offsets + .len() + .checked_sub(1) + .ok_or_else(|| DataFusionError::Internal("missing batch offsets".to_string()))?; + + if source_count == 0 { + return internal_err!("missing build-side arrays"); + } + + if source_count == 1 { + return Ok(PreparedTakeIndices::SingleSource(SingleSourceTakeIndices { + total_rows: batch_offsets[1], + max_local_index: CachedMaxLocalIndex::Unknown, + })); + } + + Ok(PreparedTakeIndices::Resolved(resolve_take_indices( + batch_offsets, + indices, + )?)) +} + +fn resolve_take_indices( + batch_offsets: &[usize], + indices: &UInt64Array, +) -> Result { + let source_count = batch_offsets + .len() + .checked_sub(1) + .ok_or_else(|| DataFusionError::Internal("missing batch offsets".to_string()))?; + let null_source_index = source_count; + + let mut interleave_indices = Vec::with_capacity(indices.len()); + let mut max_local_index: Option = None; + let mut single_batch_index = None; + let mut single_batch = true; + + for index in indices.iter() { + match index { + Some(index) => { + let (batch_index, local_index) = + locate_record_batch(batch_offsets, index as usize)?; + interleave_indices.push((batch_index, local_index)); + + let local_index = local_index as u64; + max_local_index = Some(match max_local_index { + Some(max_local_index) => max_local_index.max(local_index), + None => local_index, + }); + + if let Some(existing_batch_index) = single_batch_index { + single_batch &= existing_batch_index == batch_index; + } else { + single_batch_index = Some(batch_index); + } + } + None => interleave_indices.push((null_source_index, 0)), + } + } + + let single_batch = if single_batch { + single_batch_index.map(|batch_index| ResolvedSingleBatchTakeIndices { + batch_index, + max_local_index, + local_indices: None, + }) + } else { + None + }; + + Ok(ResolvedTakeIndices { + source_count, + interleave_indices, + single_batch, + }) +} + +fn build_single_batch_local_indices( + interleave_indices: &[(usize, usize)], + indices: &UInt64Array, +) -> UInt64Array { + let mut local_indices = UInt64Builder::with_capacity(indices.len()); + for (index, (_, local_index)) in indices.iter().zip(interleave_indices.iter()) { + match index { + Some(_) => local_indices.append_value(*local_index as u64), + None => local_indices.append_null(), + } + } + local_indices.finish() +} + +fn can_take_single_batch_array(array: &dyn Array, max_local_index: Option) -> bool { + if !contains_fixed_size_list(array.data_type()) { + return true; + } + + let Some(list) = array.as_any().downcast_ref::() else { + return false; + }; + + if contains_fixed_size_list(&list.value_type()) { + return false; + } + + let Some(max_local_index) = max_local_index else { + return true; + }; + + let Some(row_count) = max_local_index.checked_add(1) else { + return false; + }; + let Ok(value_length) = u64::try_from(list.value_length()) else { + return false; + }; + + row_count + .checked_mul(value_length) + .is_some_and(|value_count| value_count <= u32::MAX as u64) +} + +fn single_source_max_local_index( + single_source: &mut SingleSourceTakeIndices, + indices: &UInt64Array, +) -> Result> { + if let CachedMaxLocalIndex::Known(max_local_index) = single_source.max_local_index { + return Ok(max_local_index); + } + + let max_local_index = max_index_in_bounds(indices, single_source.total_rows)?; + single_source.max_local_index = CachedMaxLocalIndex::Known(max_local_index); + Ok(max_local_index) +} + +fn interleave_single_source_array( + array: &dyn Array, + indices: &UInt64Array, +) -> Result { + if indices.is_empty() { + return Ok(new_empty_array(array.data_type())); + } + + let mut sources = vec![array]; + let null_array = + (indices.null_count() > 0).then(|| new_null_array(array.data_type(), 1)); + let null_source = null_array.as_ref().map(|null_array| { + let index = sources.len(); + sources.push(null_array.as_ref()); + index + }); + + let mut interleave_indices = Vec::with_capacity(indices.len()); + for index in indices.iter() { + match index { + Some(index) => interleave_indices.push((0, index as usize)), + None => interleave_indices.push((null_source.expect("null source"), 0)), + } + } + + Ok(interleave(&sources, &interleave_indices)?) +} + +fn take_array_from_batches_with_prepared_indices( + arrays: &[ArrayRef], + indices: &UInt64Array, + prepared_indices: &mut PreparedTakeIndices, +) -> Result { + let first_array = arrays.first().ok_or_else(|| { + DataFusionError::Internal("missing build-side arrays".to_string()) + })?; + + if indices.is_empty() { + return Ok(new_empty_array(first_array.data_type())); + } + + match prepared_indices { + PreparedTakeIndices::SingleSource(single_source) => { + if arrays.len() != 1 { + return internal_err!( + "single-source indices expect 1 build-side array but found {}", + arrays.len() + ); + } + let array = arrays[0].as_ref(); + if !contains_fixed_size_list(array.data_type()) { + return Ok(take(array, indices, None)?); + } + + let max_local_index = single_source_max_local_index(single_source, indices)?; + if can_take_single_batch_array(array, max_local_index) { + return Ok(take(array, indices, None)?); + } + + interleave_single_source_array(array, indices) + } + PreparedTakeIndices::Resolved(resolved_indices) => { + if resolved_indices.source_count != arrays.len() { + return internal_err!( + "resolved indices expect {} build-side arrays but found {}", + resolved_indices.source_count, + arrays.len() + ); + } + + let single_batch_index = + resolved_indices + .single_batch + .as_ref() + .and_then(|single_batch| { + arrays + .get(single_batch.batch_index) + .filter(|array| { + can_take_single_batch_array( + array.as_ref(), + single_batch.max_local_index, + ) + }) + .map(|_| single_batch.batch_index) + }); + + if let Some(batch_index) = single_batch_index { + let array = arrays.get(batch_index).ok_or_else(|| { + DataFusionError::Internal( + "invalid single-batch take index".to_string(), + ) + })?; + let ResolvedTakeIndices { + interleave_indices, + single_batch, + .. + } = resolved_indices; + let local_indices = if let Some(single_batch) = single_batch.as_mut() { + if single_batch.local_indices.is_none() { + single_batch.local_indices = Some( + build_single_batch_local_indices(interleave_indices, indices), + ); + } + single_batch.local_indices.as_ref().ok_or_else(|| { + DataFusionError::Internal( + "missing local indices for single-batch take".to_string(), + ) + })? + } else { + return internal_err!("missing single-batch indices for take"); + }; + return Ok(take(array.as_ref(), local_indices, None)?); + } + + let mut sources: Vec<&dyn Array> = + arrays.iter().map(|array| array.as_ref()).collect(); + let null_array = (indices.null_count() > 0) + .then(|| new_null_array(first_array.data_type(), 1)); + if let Some(array) = null_array.as_ref() { + sources.push(array.as_ref()); + } + + Ok(interleave(&sources, &resolved_indices.interleave_indices)?) + } + } +} + +#[cfg(test)] +fn take_array_from_batches( + arrays: &[ArrayRef], + batch_offsets: &[usize], + indices: &UInt64Array, +) -> Result { + let mut prepared_indices = prepare_take_indices(batch_offsets, indices)?; + take_array_from_batches_with_prepared_indices(arrays, indices, &mut prepared_indices) +} + +pub(crate) fn value_is_null_from_batches( + batch_offsets: &[usize], + values_by_batch: &[Vec], + value_index: usize, + row_index: usize, +) -> Result { + let (batch_index, local_index) = locate_record_batch(batch_offsets, row_index)?; + Ok(values_by_batch[batch_index][value_index].is_null(local_index)) +} + +pub(super) fn equal_rows_arr_from_batches( + indices_left: &UInt64Array, + indices_right: &UInt32Array, + left_arrays: &[Vec], + left_batch_offsets: &[usize], + right_arrays: &[ArrayRef], + null_equality: NullEquality, +) -> Result<(UInt64Array, UInt32Array)> { + if indices_left.is_empty() || right_arrays.is_empty() { + return Ok((Vec::::new().into(), Vec::::new().into())); + } + + let left_arrays_per_key = left_arrays + .first() + .ok_or_else(|| { + DataFusionError::Internal("missing build-side key arrays".to_string()) + })? + .len(); + debug_assert_eq!(left_arrays_per_key, right_arrays.len()); + + let mut prepared_left_indices = + prepare_take_indices(left_batch_offsets, indices_left)?; + let mut left_key_arrays = left_arrays + .iter() + .map(|arrays| Arc::clone(&arrays[0])) + .collect::>(); + let arr_left = take_array_from_batches_with_prepared_indices( + &left_key_arrays, + indices_left, + &mut prepared_left_indices, + )?; + let arr_right = take(right_arrays[0].as_ref(), indices_right, None)?; + + let mut equal: BooleanArray = + eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; + + for value_index in 1..right_arrays.len() { + left_key_arrays.clear(); + left_key_arrays.extend( + left_arrays + .iter() + .map(|arrays| Arc::clone(&arrays[value_index])), + ); + + let arr_left = take_array_from_batches_with_prepared_indices( + &left_key_arrays, + indices_left, + &mut prepared_left_indices, + )?; + let arr_right = take(right_arrays[value_index].as_ref(), indices_right, None)?; + let equal_next = + eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; + equal = and(&equal, &equal_next)?; + } + + let filter_builder = FilterBuilder::new(&equal).optimize().build(); + + let left_filtered = filter_builder.filter(indices_left)?; + let right_filtered = filter_builder.filter(indices_right)?; + + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) +} + pub(super) fn equal_rows_arr( indices_left: &UInt64Array, indices_right: &UInt32Array, @@ -1926,8 +2546,8 @@ mod tests { use super::*; - use arrow::array::Int32Array; - use arrow::datatypes::{DataType, Fields}; + use arrow::array::{FixedSizeListArray, Int32Array, UInt64Array}; + use arrow::datatypes::{DataType, Field, Fields}; use arrow::error::{ArrowError, Result as ArrowResult}; use datafusion_common::stats::Precision::{Absent, Exact, Inexact}; use datafusion_common::{ScalarValue, arrow_datafusion_err, arrow_err}; @@ -2009,6 +2629,240 @@ mod tests { assert!(matches!(root_err, _expected)) } + #[test] + fn test_contains_fixed_size_list_nested_types() { + let value_field = Arc::new(Field::new("item", DataType::Int32, false)); + let fixed_size_list = DataType::FixedSizeList(Arc::clone(&value_field), 2); + let nested_struct = DataType::Struct( + vec![ + Arc::new(Field::new("plain", DataType::Int32, false)), + Arc::new(Field::new("nested", fixed_size_list.clone(), false)), + ] + .into(), + ); + + assert!(!contains_fixed_size_list(&DataType::Int32)); + assert!(contains_fixed_size_list(&fixed_size_list)); + assert!(contains_fixed_size_list(&nested_struct)); + assert!(contains_fixed_size_list(&DataType::List(Arc::new( + Field::new("list_item", fixed_size_list, true,) + )))); + } + + #[test] + fn test_locate_record_batch_boundaries() -> Result<()> { + let offsets = vec![0, 2, 5, 9]; + + assert_eq!(locate_record_batch(&offsets, 0)?, (0, 0)); + assert_eq!(locate_record_batch(&offsets, 1)?, (0, 1)); + assert_eq!(locate_record_batch(&offsets, 2)?, (1, 0)); + assert_eq!(locate_record_batch(&offsets, 4)?, (1, 2)); + assert_eq!(locate_record_batch(&offsets, 5)?, (2, 0)); + assert_eq!(locate_record_batch(&offsets, 8)?, (2, 3)); + + assert!(locate_record_batch(&offsets, 9).is_err()); + assert!(locate_record_batch(&[], 0).is_err()); + Ok(()) + } + + #[test] + fn test_prepare_take_indices_single_source() -> Result<()> { + let prepared = prepare_take_indices(&[0, 3], &UInt64Array::from(vec![2, 0]))?; + let PreparedTakeIndices::SingleSource(single_source) = prepared else { + unreachable!("expected single-source indices"); + }; + + assert_eq!(single_source.total_rows, 3); + assert!(matches!( + single_source.max_local_index, + CachedMaxLocalIndex::Unknown + )); + Ok(()) + } + + #[test] + fn test_resolve_take_indices_single_batch_with_nulls() -> Result<()> { + let indices = UInt64Array::from(vec![Some(3), None, Some(2)]); + let resolved = resolve_take_indices(&[0, 2, 5], &indices)?; + + assert_eq!(resolved.source_count, 2); + assert_eq!(resolved.interleave_indices, vec![(1, 1), (2, 0), (1, 0)]); + + let single_batch = resolved.single_batch.as_ref().unwrap(); + assert_eq!(single_batch.batch_index, 1); + assert_eq!(single_batch.max_local_index, Some(1)); + + let local_indices = + build_single_batch_local_indices(&resolved.interleave_indices, &indices); + assert_eq!( + local_indices, + UInt64Array::from(vec![Some(1), None, Some(0)]) + ); + + Ok(()) + } + + #[test] + fn test_can_take_single_batch_array_fixed_size_list_threshold() { + let value_field = Arc::new(Field::new("item", DataType::Int32, false)); + let list = FixedSizeListArray::new( + Arc::clone(&value_field), + 2, + Arc::new(Int32Array::from(vec![0, 1, 2, 3])), + None, + ); + + assert!(can_take_single_batch_array( + &list, + Some((u32::MAX as u64 / 2) - 1) + )); + assert!(!can_take_single_batch_array( + &list, + Some(u32::MAX as u64 / 2) + )); + + let nested_value_field = Arc::new(Field::new( + "item", + DataType::FixedSizeList(Arc::clone(&value_field), 2), + false, + )); + let nested_list = FixedSizeListArray::new( + nested_value_field, + 2, + Arc::new(FixedSizeListArray::new( + value_field, + 2, + Arc::new(Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7])), + None, + )), + None, + ); + assert!(!can_take_single_batch_array(&nested_list, Some(0))); + } + + #[test] + fn test_take_array_from_batches_multi_batch_with_nulls() -> Result<()> { + let arrays = vec![ + Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef, + Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef, + ]; + let indices = UInt64Array::from(vec![Some(3), None, Some(0), Some(2)]); + let taken = take_array_from_batches(&arrays, &[0, 2, 4], &indices)?; + let taken = taken.as_any().downcast_ref::().unwrap(); + + assert_eq!(taken.len(), 4); + assert_eq!(taken.value(0), 40); + assert!(taken.is_null(1)); + assert_eq!(taken.value(2), 10); + assert_eq!(taken.value(3), 30); + Ok(()) + } + + #[test] + fn test_take_array_from_batches_single_batch_with_nulls() -> Result<()> { + let arrays = vec![ + Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef, + Arc::new(Int32Array::from(vec![30, 40, 50])) as ArrayRef, + ]; + let indices = UInt64Array::from(vec![Some(3), None, Some(2), Some(4)]); + let taken = take_array_from_batches(&arrays, &[0, 2, 5], &indices)?; + let taken = taken.as_any().downcast_ref::().unwrap(); + + assert_eq!(taken.len(), 4); + assert_eq!(taken.value(0), 40); + assert!(taken.is_null(1)); + assert_eq!(taken.value(2), 30); + assert_eq!(taken.value(3), 50); + Ok(()) + } + + #[test] + fn test_take_array_from_batches_single_batch_fixed_size_list() -> Result<()> { + let value_field = Arc::new(Field::new("item", DataType::Int32, false)); + let batch = Arc::new(FixedSizeListArray::new( + Arc::clone(&value_field), + 2, + Arc::new(Int32Array::from(vec![0, 1, 10, 11, 20, 21])), + None, + )) as ArrayRef; + + let taken = take_array_from_batches( + &[batch], + &[0, 3], + &UInt64Array::from(vec![2, 1, 0]), + )?; + let taken = taken.as_any().downcast_ref::().unwrap(); + let values = taken.values(); + let values = values.as_any().downcast_ref::().unwrap(); + + assert_eq!(taken.len(), 3); + assert_eq!(values.values(), &[20, 21, 10, 11, 0, 1]); + Ok(()) + } + + #[test] + fn test_take_array_from_batches_single_batch_nested_fixed_size_list() -> Result<()> { + let value_field = Arc::new(Field::new("item", DataType::Int32, false)); + let inner = Arc::new(FixedSizeListArray::new( + Arc::clone(&value_field), + 2, + Arc::new(Int32Array::from(vec![0, 1, 10, 11, 20, 21, 30, 31])), + None, + )); + let outer = Arc::new(FixedSizeListArray::new( + Arc::new(Field::new("inner", inner.data_type().clone(), false)), + 2, + inner, + None, + )) as ArrayRef; + + let taken = + take_array_from_batches(&[outer], &[0, 2], &UInt64Array::from(vec![1, 0]))?; + let taken = taken.as_any().downcast_ref::().unwrap(); + let inner = taken + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let values = inner.values(); + let values = values.as_any().downcast_ref::().unwrap(); + + assert_eq!(taken.len(), 2); + assert_eq!(inner.len(), 4); + assert_eq!(values.values(), &[20, 21, 30, 31, 0, 1, 10, 11]); + Ok(()) + } + + #[test] + fn test_take_array_from_batches_fixed_size_list() -> Result<()> { + let value_field = Arc::new(Field::new("item", DataType::Int32, false)); + let batch_1 = Arc::new(FixedSizeListArray::new( + Arc::clone(&value_field), + 2, + Arc::new(Int32Array::from(vec![0, 1, 10, 11])), + None, + )) as ArrayRef; + let batch_2 = Arc::new(FixedSizeListArray::new( + Arc::clone(&value_field), + 2, + Arc::new(Int32Array::from(vec![20, 21, 30, 31])), + None, + )) as ArrayRef; + + let taken = take_array_from_batches( + &[batch_1, batch_2], + &[0, 2, 4], + &UInt64Array::from(vec![2, 1, 3, 0]), + )?; + let taken = taken.as_any().downcast_ref::().unwrap(); + let values = taken.values(); + let values = values.as_any().downcast_ref::().unwrap(); + + assert_eq!(taken.len(), 4); + assert_eq!(values.values(), &[20, 21, 10, 11, 30, 31, 0, 1]); + Ok(()) + } + #[test] fn check_not_in_left() { let left = vec![Column::new("b", 0)];