diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 5ea3589f22b3e..df8b53254d084 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -19,14 +19,17 @@ //! into a single partition use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::stream::{ObservedStream, RecordBatchReceiverStream}; use super::{ - DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, - Statistics, + DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; @@ -34,11 +37,15 @@ use crate::sort_pushdown::SortOrderPushdownResult; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; +use futures::ready; +use futures::stream::{Stream, StreamExt}; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. @@ -214,6 +221,8 @@ impl ExecutionPlan for CoalescePartitionsExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); + let batch_size = context.session_config().batch_size(); + // use a stream that allows each sender to put in at // least one result in an attempt to maximize // parallelism. @@ -231,11 +240,23 @@ impl ExecutionPlan for CoalescePartitionsExec { } let stream = builder.build(); - Ok(Box::pin(ObservedStream::new( - stream, - baseline_metrics, - self.fetch, - ))) + // Coalesce small batches from multiple partitions into + // larger batches of target_batch_size. This improves + // downstream performance (e.g. hash join build side + // benefits from fewer, larger batches). + Ok(Box::pin(CoalescedStream { + input: Box::pin(ObservedStream::new( + stream, + baseline_metrics, + self.fetch, + )), + coalescer: LimitedBatchCoalescer::new( + self.schema(), + batch_size, + None, // fetch is already handled by ObservedStream + ), + completed: false, + })) } } } @@ -352,6 +373,55 @@ impl ExecutionPlan for CoalescePartitionsExec { } } +/// Stream that coalesces small batches into larger ones using +/// [`LimitedBatchCoalescer`]. +struct CoalescedStream { + input: SendableRecordBatchStream, + coalescer: LimitedBatchCoalescer, + completed: bool, +} + +impl Stream for CoalescedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + if let Some(batch) = self.coalescer.next_completed_batch() { + return Poll::Ready(Some(Ok(batch))); + } + if self.completed { + return Poll::Ready(None); + } + let input_batch = ready!(self.input.poll_next_unpin(cx)); + match input_batch { + None => { + self.completed = true; + self.coalescer.finish()?; + } + Some(Ok(batch)) => { + match self.coalescer.push_batch(batch)? { + PushBatchStatus::Continue => {} + PushBatchStatus::LimitReached => { + self.completed = true; + self.coalescer.finish()?; + } + } + } + other => return Poll::Ready(other), + } + } + } +} + +impl RecordBatchStream for CoalescedStream { + fn schema(&self) -> SchemaRef { + self.coalescer.schema() + } +} + #[cfg(test)] mod tests { use super::*; @@ -383,10 +453,9 @@ mod tests { 1 ); - // the result should contain 4 batches (one per input partition) + // the result should contain all rows (coalesced into fewer batches) let iter = merge.execute(0, task_ctx)?; let batches = common::collect(iter).await?; - assert_eq!(batches.len(), num_partitions); // there should be a total of 400 rows (100 per each partition) let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ad40d6776df4f..ff4a67fac1bf7 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -157,12 +157,17 @@ impl ArrayMap { max_val.wrapping_sub(min_val) } - /// Creates a new [`ArrayMap`] from the given array of join keys. + /// Creates a new [`ArrayMap`] from per-batch arrays of join keys. /// - /// Note: This function processes only the non-null values in the input `array`, + /// Note: This function processes only the non-null values in the input arrays, /// ignoring any rows where the key is `NULL`. /// - pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result { + pub(crate) fn try_new( + arrays: &[&ArrayRef], + total_num_rows: usize, + min_val: u64, + max_val: u64, + ) -> Result { let range = max_val.wrapping_sub(min_val); if range >= usize::MAX as u64 { return internal_err!("ArrayMap key range is too large to be allocated."); @@ -173,10 +178,16 @@ impl ArrayMap { let mut next: Vec = vec![]; let mut num_of_distinct_key = 0; + let data_type = arrays + .first() + .map(|a| a.data_type().clone()) + .unwrap_or(DataType::Int32); + downcast_supported_integer!( - array.data_type() => ( - fill_data, - array, + &data_type => ( + fill_data_batched, + arrays, + total_num_rows, min_val, &mut data, &mut next, @@ -192,8 +203,9 @@ impl ArrayMap { }) } - fn fill_data( - array: &ArrayRef, + fn fill_data_batched( + arrays: &[&ArrayRef], + total_num_rows: usize, offset_val: u64, data: &mut [u32], next: &mut Vec, @@ -202,25 +214,32 @@ impl ArrayMap { where T::Native: AsPrimitive, { - let arr = array.as_primitive::(); // Iterate in reverse to maintain FIFO order when there are duplicate keys. - for (i, val) in arr.iter().enumerate().rev() { - if let Some(val) = val { - let key: u64 = val.as_(); - let idx = key.wrapping_sub(offset_val) as usize; - if idx >= data.len() { - return internal_err!("failed build Array idx >= data.len()"); - } - - if data[idx] != 0 { - if next.is_empty() { - *next = vec![0; array.len()] + // We iterate batches in reverse, and within each batch iterate rows in reverse, + // using a flat index that spans all batches. + let mut flat_offset = total_num_rows; + for array in arrays.iter().rev() { + let arr = array.as_primitive::(); + flat_offset -= arr.len(); + for (row_idx, val) in arr.iter().enumerate().rev() { + if let Some(val) = val { + let key: u64 = val.as_(); + let idx = key.wrapping_sub(offset_val) as usize; + if idx >= data.len() { + return internal_err!("failed build Array idx >= data.len()"); } - next[i] = data[idx] - } else { - *num_of_distinct_key += 1; + let flat_idx = flat_offset + row_idx; + + if data[idx] != 0 { + if next.is_empty() { + *next = vec![0; total_num_rows] + } + next[flat_idx] = data[idx] + } else { + *num_of_distinct_key += 1; + } + data[idx] = flat_idx as u32 + 1; } - data[idx] = (i) as u32 + 1; } } Ok(()) @@ -419,7 +438,7 @@ mod tests { #[test] fn test_array_map_limit_offset_duplicate_elements() -> Result<()> { let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2])); - let map = ArrayMap::try_new(&build, 1, 2)?; + let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?; let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef]; let mut prob_idx = Vec::new(); @@ -450,7 +469,7 @@ mod tests { #[test] fn test_array_map_with_limit_and_misses() -> Result<()> { let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let map = ArrayMap::try_new(&build, 1, 2)?; + let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?; let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef]; let (mut p_idx, mut b_idx) = (vec![], vec![]); @@ -483,7 +502,7 @@ mod tests { #[test] fn test_array_map_with_build_duplicates_and_misses() -> Result<()> { let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); - let array_map = ArrayMap::try_new(&build_array, 1, 1)?; + let array_map = ArrayMap::try_new(&[&build_array], build_array.len(), 1, 1)?; // prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2) let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1])); let prob_side_keys = [probe_array]; @@ -513,7 +532,12 @@ mod tests { let min_val = -5_i128; let max_val = 10_i128; - let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?; + let array_map = ArrayMap::try_new( + &[&build_array], + build_array.len(), + min_val as u64, + max_val as u64, + )?; // Probe array let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1])); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c66123facb627..e7c90af69aa9a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,8 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; -use arrow::compute::concat_batches; +use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt64Array}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -106,14 +105,14 @@ const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; #[expect(clippy::too_many_arguments)] fn try_create_array_map( bounds: &Option, - schema: &SchemaRef, + _schema: &SchemaRef, batches: &[RecordBatch], on_left: &[PhysicalExprRef], reservation: &mut MemoryReservation, perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, -) -> Result)>> { +) -> Result> { if on_left.len() != 1 { return Ok(None); } @@ -178,12 +177,39 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; - let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + // Evaluate key expressions per-batch (no concatenation needed) + let per_batch_keys: Vec = batches + .iter() + .map(|batch| { + let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + Ok(arrays.into_iter().next().unwrap()) + }) + .collect::>>()?; + let key_refs: Vec<&ArrayRef> = per_batch_keys.iter().collect(); + + let array_map = ArrayMap::try_new(&key_refs, num_row, min_val, max_val)?; - let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; + Ok(Some(array_map)) +} - Ok(Some((array_map, batch, left_values))) +/// Convert flat indices (used in the visited bitmap) to packed composite indices +/// `(batch_idx << 32) | row_idx` used by `build_batch_from_indices`. +pub(super) fn flat_to_packed_indices( + flat_indices: &UInt64Array, + batch_offsets: &[usize], +) -> UInt64Array { + flat_indices + .values() + .iter() + .map(|&flat_val| { + let flat = flat_val as usize; + let batch_idx = batch_offsets + .partition_point(|&o| o <= flat) + .saturating_sub(1); + let row_idx = flat - batch_offsets[batch_idx]; + ((batch_idx as u64) << 32) | (row_idx as u64) + }) + .collect() } /// HashTable and input data for the left (build side) of a join @@ -191,10 +217,17 @@ pub(super) struct JoinLeftData { /// The hash table with indices into `batch` /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown pub(super) map: Arc, - /// The input rows for the build side - batch: RecordBatch, - /// The build side on expressions values - values: Vec, + /// The schema of the build side input + schema: SchemaRef, + /// The input batches for the build side (kept separate to avoid concat_batches) + batches: Vec, + /// Prefix-sum of batch sizes for converting flat indices to (batch_idx, row_idx). + /// batch_offsets[i] is the starting flat index for batches[i]. + /// Has length batches.len() + 1 (last element is total row count). + batch_offsets: Vec, + /// The build side key expression values, per batch. + /// values_per_batch[batch_idx][key_idx] is the key column array for that batch. + values_per_batch: Vec>, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially @@ -225,14 +258,29 @@ impl JoinLeftData { &self.map } - /// returns a reference to the build side batch - pub(super) fn batch(&self) -> &RecordBatch { - &self.batch + /// Returns a reference to a build-side batch for schema/data-type lookups. + /// If batches exist, returns the first one; otherwise creates an empty batch + /// with the correct build-side schema. + pub(super) fn batch(&self) -> RecordBatch { + self.batches + .first() + .cloned() + .unwrap_or_else(|| RecordBatch::new_empty(Arc::clone(&self.schema))) } - /// returns a reference to the build side expressions values - pub(super) fn values(&self) -> &[ArrayRef] { - &self.values + /// returns a reference to the build side batches + pub(super) fn batches(&self) -> &[RecordBatch] { + &self.batches + } + + /// returns the batch offsets for flat-to-(batch,row) index conversion + pub(super) fn batch_offsets(&self) -> &[usize] { + &self.batch_offsets + } + + /// returns a reference to the build side key expressions values, per batch + pub(super) fn values_per_batch(&self) -> &[Vec] { + &self.values_per_batch } /// returns a reference to the visited indices bitmap @@ -1965,8 +2013,8 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( + let (join_hash_map, should_reverse_batches) = if let Some(array_map) = + try_create_array_map( &bounds, &schema, &batches, @@ -1976,70 +2024,110 @@ async fn collect_left_input( config.execution.perfect_hash_join_min_key_density, null_equality, )? { - array_map_created_count.add(1); - metrics.build_mem_used.add(array_map.size()); + array_map_created_count.add(1); + metrics.build_mem_used.add(array_map.size()); - (Map::ArrayMap(array_map), batch, left_value) + (Map::ArrayMap(array_map), false) + } else { + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) } else { - // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the - // `u64` indice variant - // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) - }; - - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - let batches_iter = batches.iter().rev(); - - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - )?; - offset += batch.num_rows(); - } + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + let batches_iter = batches.iter().rev(); + + // Updating hashmap starting from the last batch + for batch in batches_iter.clone() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + batch, + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + )?; + offset += batch.num_rows(); + } - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + (Map::HashMap(hashmap), true) + }; - (Map::HashMap(hashmap), batch, left_values) - }; + // For HashMap: reverse the batches to match the hash map's flat index ordering. + // The hash map is built iterating in reverse (last batch first), so flat index 0 + // corresponds to the first row of the last original batch. By reversing, batches[0] + // is the last original batch, matching the hash map's indexing scheme. + // For ArrayMap: batches are already in the correct (forward) order since the + // ArrayMap is built from forward-ordered concatenated key columns. + let mut batches = batches; + if should_reverse_batches { + batches.reverse(); + } + // Remove empty batches to avoid ambiguity in flat_to_packed_indices: + // duplicate offsets from empty batches cause partition_point to return + // incorrect batch indices. + batches.retain(|b| b.num_rows() > 0); + + // Evaluate key expressions per-batch (no concatenation needed for equal_rows_arr) + let values_per_batch: Vec> = if batches.is_empty() { + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); + let empty_keys = on_left + .iter() + .map(|c| c.evaluate(&empty_batch)?.into_array(0)) + .collect::>>()?; + vec![empty_keys] + } else { + batches + .iter() + .map(|batch| { + on_left + .iter() + .map(|c| c.evaluate(batch)?.into_array(batch.num_rows())) + .collect::>>() + }) + .collect::>>()? + }; + + // Compute batch_offsets (prefix sum of batch sizes) for flat-to-packed index conversion + let batch_offsets = { + let mut offsets = Vec::with_capacity(batches.len() + 1); + offsets.push(0usize); + for b in &batches { + offsets.push(offsets.last().unwrap() + b.num_rows()); + } + offsets + }; // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { - let bitmap_size = bit_util::ceil(batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { @@ -2051,15 +2139,14 @@ async fn collect_left_input( let membership = if num_rows == 0 { PushdownStrategy::Empty } else { - // If the build side is small enough we can use IN list pushdown. - // If it's too big we fall back to pushing down a reference to the hash table. - // See `PushdownStrategy` for more details. - let estimated_size = left_values + // Estimate total size from per-batch values (avoid concatenation for size check) + let estimated_size: usize = values_per_batch .iter() + .flat_map(|keys| keys.iter()) .map(|arr| arr.get_array_memory_size()) - .sum::(); - if left_values.is_empty() - || left_values[0].is_empty() + .sum(); + if values_per_batch.is_empty() + || values_per_batch[0].is_empty() || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size || map.num_of_distinct_key() > config @@ -2067,7 +2154,9 @@ async fn collect_left_input( .hash_join_inlist_pushdown_max_distinct_values { PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { + } else if let Some(in_list_values) = + build_struct_inlist_values(&values_per_batch)? + { PushdownStrategy::InList(in_list_values) } else { PushdownStrategy::Map(Arc::clone(&map)) @@ -2080,8 +2169,10 @@ async fn collect_left_input( let data = JoinLeftData { map, - batch, - values: left_values, + schema: Arc::clone(&schema), + batches, + batch_offsets, + values_per_batch, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), _reservation: reservation, @@ -4341,7 +4432,8 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], + &[0, left.num_rows()], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, @@ -4402,7 +4494,8 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], + &[0, left.num_rows()], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs index 0ca338265ecc6..cd4c94f1b5c2c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs +++ b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs @@ -32,49 +32,54 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result { .collect() } -/// Builds InList values from join key column arrays. +/// Builds InList values from per-batch join key column arrays. /// -/// If `join_key_arrays` is: -/// 1. A single array, let's say Int32, this will produce a flat -/// InList expression where the lookup is expected to be scalar Int32 values, -/// that is: this will produce `IN LIST (1, 2, 3)` expected to be used as `2 IN LIST (1, 2, 3)`. -/// 2. An Int32 array and a Utf8 array, this will produce a Struct InList expression -/// where the lookup is expected to be Struct values with two fields (Int32, Utf8), -/// that is: this will produce `IN LIST ((1, "a"), (2, "b"))` expected to be used as `(2, "b") IN LIST ((1, "a"), (2, "b"))`. -/// The field names of the struct are auto-generated as "c0", "c1", ... and should match the struct expression used in the join keys. +/// `values_per_batch` is indexed as `values_per_batch[batch_idx][key_idx]`. +/// +/// If there is a single key column, this concatenates the per-batch arrays into a single flat array. +/// If there are multiple key columns, this builds per-batch StructArrays and concatenates them. /// /// Note that this function does not deduplicate values - deduplication will happen later /// when building an InList expression from this array via `InListExpr::try_new_from_array`. -/// -/// Returns `None` if the estimated size exceeds `max_size_bytes` or if the number of rows -/// exceeds `max_distinct_values`. pub(super) fn build_struct_inlist_values( - join_key_arrays: &[ArrayRef], + values_per_batch: &[Vec], ) -> Result> { - // Build the source array/struct - let source_array: ArrayRef = if join_key_arrays.len() == 1 { - // Single column: use directly - Arc::clone(&join_key_arrays[0]) + if values_per_batch.is_empty() || values_per_batch[0].is_empty() { + return Ok(None); + } + + let num_keys = values_per_batch[0].len(); + + if num_keys == 1 { + // Single column: concat per-batch arrays directly + let arrays: Vec<&dyn arrow::array::Array> = values_per_batch + .iter() + .map(|keys| keys[0].as_ref()) + .collect(); + let concatenated = arrow::compute::concat(&arrays)?; + Ok(Some(concatenated)) } else { - // Multi-column: build StructArray once from all columns - let fields = build_struct_fields( - &join_key_arrays - .iter() - .map(|arr| arr.data_type().clone()) - .collect::>(), - )?; - - // Build field references with proper Arc wrapping - let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields + // Multi-column: build per-batch StructArrays (zero-copy wrap), then concat + let data_types: Vec = values_per_batch[0] .iter() - .cloned() - .zip(join_key_arrays.iter().cloned()) + .map(|arr| arr.data_type().clone()) .collect(); + let fields = build_struct_fields(&data_types)?; - Arc::new(StructArray::from(arrays_with_fields)) - }; + let struct_arrays: Vec = values_per_batch + .iter() + .map(|keys| { + let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = + fields.iter().cloned().zip(keys.iter().cloned()).collect(); + Arc::new(StructArray::from(arrays_with_fields)) as ArrayRef + }) + .collect(); - Ok(Some(source_array)) + let refs: Vec<&dyn arrow::array::Array> = + struct_arrays.iter().map(|a| a.as_ref()).collect(); + let concatenated = arrow::compute::concat(&refs)?; + Ok(Some(concatenated)) + } } #[cfg(test)] @@ -89,22 +94,34 @@ mod tests { #[test] fn test_build_single_column_inlist_array() { let array = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; - let result = build_struct_inlist_values(std::slice::from_ref(&array)) - .unwrap() - .unwrap(); + // Single batch with single key column + let batches = vec![vec![Arc::clone(&array)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert!(array.eq(&result)); } + #[test] + fn test_build_single_column_multi_batch() { + let array1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let array2 = Arc::new(Int32Array::from(vec![2, 1])) as ArrayRef; + // Two batches with single key column + let batches = vec![vec![Arc::clone(&array1)], vec![Arc::clone(&array2)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); + + let expected = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; + assert!(expected.eq(&result)); + } + #[test] fn test_build_multi_column_inlist() { let array1 = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; let array2 = Arc::new(StringArray::from(vec!["a", "b", "c", "b", "a"])) as ArrayRef; - let result = build_struct_inlist_values(&[array1, array2]) - .unwrap() - .unwrap(); + // Single batch with two key columns + let batches = vec![vec![array1, array2]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!( *result.data_type(), @@ -124,9 +141,9 @@ mod tests { let int_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; - let result = build_struct_inlist_values(&[dict_array, int_array]) - .unwrap() - .unwrap(); + // Single batch with two key columns + let batches = vec![vec![dict_array, int_array]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!(result.len(), 3); assert_eq!( @@ -150,9 +167,9 @@ mod tests { let values = Arc::new(StringArray::from(vec!["foo"])); let dict_array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; - let result = build_struct_inlist_values(std::slice::from_ref(&dict_array)) - .unwrap() - .unwrap(); + // Single batch with single key column + let batches = vec![vec![Arc::clone(&dict_array)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!(result.len(), 3); assert_eq!(result.data_type(), dict_array.data_type()); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 57218244bae6b..f3eed84ce8e7b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -285,7 +285,8 @@ impl RecordBatchStream for HashJoinStream { #[expect(clippy::too_many_arguments)] pub(super) fn lookup_join_hashmap( build_hashmap: &dyn JoinHashMapType, - build_side_values: &[ArrayRef], + build_side_values_per_batch: &[Vec], + batch_offsets: &[usize], probe_side_values: &[ArrayRef], null_equality: NullEquality, hashes_buffer: &[u64], @@ -302,8 +303,11 @@ pub(super) fn lookup_join_hashmap( build_indices_buffer, ); + // Convert flat indices from hash map to packed composite indices let build_indices_unfiltered: UInt64Array = std::mem::take(build_indices_buffer).into(); + let build_indices_unfiltered = + super::exec::flat_to_packed_indices(&build_indices_unfiltered, batch_offsets); let probe_indices_unfiltered: UInt32Array = std::mem::take(probe_indices_buffer).into(); @@ -312,7 +316,7 @@ pub(super) fn lookup_join_hashmap( let (build_indices, probe_indices) = equal_rows_arr( &build_indices_unfiltered, &probe_indices_unfiltered, - build_side_values, + build_side_values_per_batch, probe_side_values, null_equality, )?; @@ -649,7 +653,7 @@ impl HashJoinStream { if is_empty && self.filter.is_none() { let result = build_batch_empty_build_side( &self.schema, - build_side.left_data.batch(), + &build_side.left_data.batch().schema(), &state.batch, &self.column_indices, self.join_type, @@ -666,7 +670,8 @@ impl HashJoinStream { { Map::HashMap(map) => lookup_join_hashmap( map.as_ref(), - build_side.left_data.values(), + build_side.left_data.values_per_batch(), + build_side.left_data.batch_offsets(), &state.values, self.null_equality, &self.hashes_buffer, @@ -683,8 +688,15 @@ impl HashJoinStream { &mut self.probe_indices_buffer, &mut self.build_indices_buffer, )?; + // ArrayMap returns flat indices; convert to packed composite indices + let flat_build_indices: UInt64Array = + self.build_indices_buffer.clone().into(); + let packed_build_indices = super::exec::flat_to_packed_indices( + &flat_build_indices, + build_side.left_data.batch_offsets(), + ); ( - UInt64Array::from(self.build_indices_buffer.clone()), + packed_build_indices, UInt32Array::from(self.probe_indices_buffer.clone()), next_offset, ) @@ -706,7 +718,7 @@ impl HashJoinStream { // apply join filter if exists let (left_indices, right_indices) = if let Some(filter) = &self.filter { apply_join_filter_to_indices( - build_side.left_data.batch(), + build_side.left_data.batches(), &state.batch, left_indices, right_indices, @@ -720,10 +732,15 @@ impl HashJoinStream { }; // mark joined left-side indices as visited, if required by join type + // Convert packed indices to flat indices for the bitmap if need_produce_result_in_final(self.join_type) { + let batch_offsets = build_side.left_data.batch_offsets(); let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); - left_indices.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); + left_indices.iter().flatten().for_each(|packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let flat = batch_offsets[batch_idx] + row_idx; + bitmap.set_bit(flat, true); }); } @@ -767,23 +784,29 @@ impl HashJoinStream { )?; // Build output batch and push to coalescer - let (build_batch, probe_batch, join_side) = - if self.join_type == JoinType::RightMark { - (&state.batch, build_side.left_data.batch(), JoinSide::Right) - } else { - (build_side.left_data.batch(), &state.batch, JoinSide::Left) - }; - - let batch = build_batch_from_indices( - &self.schema, - build_batch, - probe_batch, - &left_indices, - &right_indices, - &self.column_indices, - join_side, - self.join_type, - )?; + let batch = if self.join_type == JoinType::RightMark { + build_batch_from_indices( + &self.schema, + std::slice::from_ref(&state.batch), + &build_side.left_data.batch(), + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Right, + self.join_type, + )? + } else { + build_batch_from_indices( + &self.schema, + build_side.left_data.batches(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + self.join_type, + )? + }; let push_status = self.output_buffer.push_batch(batch)?; @@ -842,11 +865,16 @@ impl HashJoinStream { } // use the global left bitmap to produce the left indices and right indices - let (mut left_side, mut right_side) = get_final_indices_from_shared_bitmap( + // The bitmap uses flat indices; convert them to packed composite indices + // for build_batch_from_indices. + let (flat_left_side, mut right_side) = get_final_indices_from_shared_bitmap( build_side.left_data.visited_indices_bitmap(), self.join_type, true, ); + let batch_offsets = build_side.left_data.batch_offsets(); + let mut left_side = + super::exec::flat_to_packed_indices(&flat_left_side, batch_offsets); // For null-aware anti join, filter out LEFT rows with NULL in join keys // BUT only if the probe side (RIGHT) was non-empty. If probe side is empty, @@ -860,17 +888,20 @@ impl HashJoinStream { .load(Ordering::Relaxed) { // Since null_aware validation ensures single column join, we only check the first column - let build_key_column = &build_side.left_data.values()[0]; + let values_per_batch = build_side.left_data.values_per_batch(); // Filter out indices where the key is NULL + // left_side contains packed indices: (batch_idx << 32) | row_idx let filtered_indices: Vec = left_side .iter() - .filter_map(|idx| { - let idx_usize = idx.unwrap() as usize; - if build_key_column.is_null(idx_usize) { + .filter_map(|idx: Option| { + let packed = idx.unwrap(); + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + if values_per_batch[batch_idx][0].is_null(row_idx) { None // Skip rows with NULL keys } else { - Some(idx.unwrap()) + Some(packed) } }) .collect(); @@ -895,7 +926,7 @@ impl HashJoinStream { let empty_right_batch = RecordBatch::new_empty(self.right.schema()); let batch = build_batch_from_indices( &self.schema, - build_side.left_data.batch(), + build_side.left_data.batches(), &empty_right_batch, &left_side, &right_side, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index bb32a222de962..5d07a91673d40 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -416,7 +416,7 @@ struct BatchProcessState { impl BatchProcessState { pub(crate) fn new(schema: Arc, batch_size: usize) -> Self { Self { - output_batches: Box::new(BatchCoalescer::new(schema, batch_size)), + output_batches: Box::new(BatchCoalescer::new(schema, batch_size.max(8))), unmatched_indices: PrimitiveBuilder::new(), start_buffer_idx: 0, start_stream_idx: 0, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 4dcbe1f647990..94650301b9461 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -402,7 +402,7 @@ impl JoinedRecordBatches { /// Clears batches without touching metadata (for early return when no filtering needed) fn clear_batches(&mut self, schema: &SchemaRef, batch_size: usize) { - self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size) + self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)); } @@ -513,7 +513,7 @@ impl JoinedRecordBatches { } fn clear(&mut self, schema: &SchemaRef, batch_size: usize) { - self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size) + self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)); self.filter_metadata = FilterMetadata::new(); self.debug_assert_empty_consistency(); @@ -785,7 +785,7 @@ impl SortMergeJoinStream { .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)), filter_metadata: FilterMetadata::new(), }, - output: BatchCoalescer::new(schema, batch_size) + output: BatchCoalescer::new(schema, batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)), batch_size, join_type, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 4429d1e3fbe5e..b8adc77e771db 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -953,7 +953,7 @@ pub(crate) fn build_side_determined_results( // Build the final result from the indices of build and probe sides: build_batch_from_indices( output_schema.as_ref(), - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), &empty_probe_batch, &build_indices, &probe_indices, @@ -1016,7 +1016,7 @@ pub(crate) fn join_with_probe_batch( let (build_indices, probe_indices) = if let Some(filter) = filter { apply_join_filter_to_indices( - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), probe_batch, build_indices, probe_indices, @@ -1056,7 +1056,7 @@ pub(crate) fn join_with_probe_batch( } else { build_batch_from_indices( schema, - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), probe_batch, &build_indices, &probe_indices, @@ -1147,10 +1147,11 @@ fn lookup_join_hashmap( let build_indices: UInt64Array = matched_build.into(); let probe_indices: UInt32Array = matched_probe.into(); + let build_join_values_per_batch = [build_join_values]; let (build_indices, probe_indices) = equal_rows_arr( &build_indices, &probe_indices, - &build_join_values, + &build_join_values_per_batch, &keys_values, null_equality, )?; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf4bf2cd163fd..eeae0efa5fe1c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -41,9 +41,9 @@ pub use crate::joins::{JoinOn, JoinOnRef}; use ahash::RandomState; use arrow::array::{ - Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, - RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array, - builder::UInt64Builder, downcast_array, new_null_array, + Array, ArrayAccessor, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, + PrimitiveArray, RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, + UInt64Array, builder::UInt64Builder, downcast_array, new_null_array, }; use arrow::array::{ ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, @@ -54,7 +54,7 @@ use arrow::array::{ }; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::eq; -use arrow::compute::{self, FilterBuilder, and, take}; +use arrow::compute::{self, FilterBuilder, take}; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, }; @@ -912,7 +912,7 @@ pub(crate) fn get_final_indices_from_bit_map( #[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( - build_input_buffer: &RecordBatch, + build_batches: &[RecordBatch], probe_batch: &RecordBatch, build_indices: UInt64Array, probe_indices: UInt32Array, @@ -934,7 +934,7 @@ pub(crate) fn apply_join_filter_to_indices( let len = end - i; let intermediate_batch = build_batch_from_indices( filter.schema(), - build_input_buffer, + build_batches, probe_batch, &build_indices.slice(i, len), &probe_indices.slice(i, len), @@ -956,7 +956,7 @@ pub(crate) fn apply_join_filter_to_indices( } else { let intermediate_batch = build_batch_from_indices( filter.schema(), - build_input_buffer, + build_batches, probe_batch, &build_indices, &probe_indices, @@ -992,12 +992,49 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result) -> ArrayRef { + let combined = NullBuffer::union(result.nulls(), index_nulls); + // SAFETY: We only modify the null buffer, which is the union of the existing nulls + // and the index nulls. All other array data (buffers, offsets, child data) is unchanged. + unsafe { + arrow::array::make_array( + result + .into_data() + .into_builder() + .nulls(combined) + .build_unchecked(), + ) + } +} + +/// Decode packed composite indices into (batch_index, row_index) pairs. +/// +/// Each u64 value encodes `(batch_idx << 32) | row_idx`. +/// For single-batch cases (batch_idx == 0), the packed value equals the row index. +fn packed_indices_to_interleave(build_indices: &UInt64Array) -> Vec<(usize, usize)> { + build_indices + .values() + .iter() + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + (batch_idx, row_idx) + }) + .collect() +} + /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. +/// +/// Build-side indices are packed composite keys: `(batch_idx << 32) | row_idx`. +/// Build-side columns are gathered from multiple batches using `interleave`, +/// avoiding the need to `concat_batches` the entire build side upfront. #[expect(clippy::too_many_arguments)] pub(crate) fn build_batch_from_indices( schema: &Schema, - build_input_buffer: &RecordBatch, + build_batches: &[RecordBatch], probe_batch: &RecordBatch, build_indices: &UInt64Array, probe_indices: &UInt32Array, @@ -1006,9 +1043,6 @@ pub(crate) fn build_batch_from_indices( join_type: JoinType, ) -> Result { if schema.fields().is_empty() { - // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` - // the build_indices were untouched so only probe_indices hold the actual - // row count. let row_count = match join_type { JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), _ => build_indices.len(), @@ -1016,25 +1050,73 @@ pub(crate) fn build_batch_from_indices( return new_empty_schema_batch(schema, row_count); } - // build the columns of the new [RecordBatch]: - // 1. pick whether the column is from the left or right - // 2. based on the pick, `take` items from the different RecordBatches + // Determine build-side gathering strategy: + // - Single batch: use `take` directly (packed value == row index since batch_idx is 0) + // - Multiple batches: use `interleave` with decoded (batch_idx, row_idx) pairs + // - No batches or all-null indices: emit null arrays + enum BuildGather { + /// Single build batch — use `take` with row indices cast to u32 + SingleBatch(UInt32Array), + /// Multiple build batches — use `interleave` with (batch_idx, row_idx) pairs + MultiBatch(Vec<(usize, usize)>), + /// All build indices are null (outer join with no matches) + AllNull, + } + + let build_gather = if build_batches.is_empty() + || build_indices.null_count() == build_indices.len() + { + BuildGather::AllNull + } else if build_batches.len() == 1 { + // Single batch: packed value == row index (batch_idx is 0), use take directly + // Preserve the null buffer from build_indices for outer joins + let values: Vec = build_indices.values().iter().map(|&v| v as u32).collect(); + let row_indices = UInt32Array::new(values.into(), build_indices.nulls().cloned()); + BuildGather::SingleBatch(row_indices) + } else { + BuildGather::MultiBatch(packed_indices_to_interleave(build_indices)) + }; + let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); for column_index in column_indices { let array = if column_index.side == JoinSide::None { - // For mark joins, the mark column is a true if the indices is not null, otherwise it will be false Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { - let array = build_input_buffer.column(column_index.index); - if array.is_empty() || build_indices.null_count() == build_indices.len() { - // Outer join would generate a null index when finding no match at our side. - // Therefore, it's possible we are empty but need to populate an n-length null array, - // where n is the length of the index array. - assert_eq!(build_indices.null_count(), build_indices.len()); - new_null_array(array.data_type(), build_indices.len()) - } else { - take(array.as_ref(), build_indices, None)? + match &build_gather { + BuildGather::SingleBatch(row_indices) => { + // take() already handles null indices (produces null output), + // so no extra null mask needed + take( + build_batches[0].column(column_index.index).as_ref(), + row_indices, + None, + )? + } + BuildGather::MultiBatch(il_indices) => { + // Gather column arrays from all build batches + let arrays: Vec<&dyn Array> = build_batches + .iter() + .map(|b| b.column(column_index.index).as_ref()) + .collect(); + let result = compute::interleave(&arrays, il_indices)?; + // Apply null mask from build_indices (for outer joins where + // unmatched rows are represented as null indices) + apply_null_mask(result, build_indices.nulls()) + } + BuildGather::AllNull => { + // All build indices are null (outer join with no matches) + // Use the build batch schema to get the correct data type + // (column_index.index refers to the build batch columns, not the output schema) + let data_type = if let Some(b) = build_batches.first() { + b.column(column_index.index).data_type().clone() + } else { + // No build batches available (empty build side in CollectLeft mode). + // Use the output schema field at the current column position. + schema.field(columns.len()).data_type().clone() + }; + new_null_array(&data_type, build_indices.len()) + } } } else { let array = probe_batch.column(column_index.index); @@ -1055,7 +1137,7 @@ pub(crate) fn build_batch_from_indices( /// The resulting batch has [Schema] `schema`. pub(crate) fn build_batch_empty_build_side( schema: &Schema, - build_batch: &RecordBatch, + left_schema: &Schema, probe_batch: &RecordBatch, column_indices: &[ColumnIndex], join_type: JoinType, @@ -1083,7 +1165,7 @@ pub(crate) fn build_batch_empty_build_side( let array = match column_index.side { // left -> null array JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), + left_schema.field(column_index.index).data_type(), num_rows, ), // right -> respective right array @@ -1764,35 +1846,114 @@ pub fn update_hash( Ok(()) } +/// Compares build-side and probe-side key columns for equality, filtering +/// out non-matching rows after hash collision resolution. +/// +/// Build-side indices are packed composite keys: `(batch_idx << 32) | row_idx`. +/// `left_arrays_per_batch[batch_idx][key_idx]` holds key columns per batch. +/// +/// For single-batch builds (the common case), uses `take` directly since +/// batch_idx == 0 means packed values equal row indices. For multi-batch +/// builds, uses `interleave` to gather from the correct batches. pub(super) fn equal_rows_arr( indices_left: &UInt64Array, indices_right: &UInt32Array, - left_arrays: &[ArrayRef], + left_arrays_per_batch: &[Vec], right_arrays: &[ArrayRef], null_equality: NullEquality, ) -> Result<(UInt64Array, UInt32Array)> { - let mut iter = left_arrays.iter().zip(right_arrays.iter()); - - let Some((first_left, first_right)) = iter.next() else { + if left_arrays_per_batch.is_empty() || right_arrays.is_empty() { return Ok((Vec::::new().into(), Vec::::new().into())); - }; + } - let arr_left = take(first_left.as_ref(), indices_left, None)?; - let arr_right = take(first_right.as_ref(), indices_right, None)?; + let num_rows = indices_left.len(); + let num_keys = right_arrays.len(); + let left_indices = indices_left.values(); + let right_indices = indices_right.values(); - let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + // Build a mutable bitmap: start all-true, clear bits where keys don't match. + // This avoids allocating intermediate arrays via take+eq+and per key column. + let mut equal_bits = BooleanBufferBuilder::new(num_rows); + equal_bits.append_n(num_rows, true); - // Use map and try_fold to iterate over the remaining pairs of arrays. - // In each iteration, take is used on the pair of arrays and their equality is determined. - // The results are then folded (combined) using the and function to get a final equality result. - equal = iter - .map(|(left, right)| { - let arr_left = take(left.as_ref(), indices_left, None)?; - let arr_right = take(right.as_ref(), indices_right, None)?; - eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality) - }) - .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?; + let single_batch = left_arrays_per_batch.len() == 1; + + // Pre-compute fallback index arrays once (used across all keys that need fallback). + // Lazy: only allocated if a key actually needs the fallback path. + let mut fallback_row_indices: Option = None; + let mut fallback_il_indices: Option> = None; + + for key_idx in 0..num_keys { + let right_array = &right_arrays[key_idx]; + + // Try element-wise comparison which avoids allocating intermediate arrays. + // Works for both single-batch and multi-batch builds on common types. + // Falls back to take/interleave+eq for nested/unsupported types. + let handled = if single_batch { + compare_rows_elementwise( + &mut equal_bits, + left_indices, + right_indices, + &left_arrays_per_batch[0][key_idx], + right_array, + null_equality, + ) + } else { + let left_arrays_for_key: Vec<&ArrayRef> = left_arrays_per_batch + .iter() + .map(|batch_keys| &batch_keys[key_idx]) + .collect(); + compare_rows_elementwise_multi( + &mut equal_bits, + left_indices, + right_indices, + &left_arrays_for_key, + right_array, + null_equality, + ) + }; + if !handled { + // Fallback: materialize via take/interleave, then eq + let arr_left = if single_batch { + let row_indices = fallback_row_indices.get_or_insert_with(|| { + left_indices.iter().map(|&v| v as u32).collect() + }); + take( + left_arrays_per_batch[0][key_idx].as_ref(), + row_indices, + None, + )? + } else { + let il_indices = fallback_il_indices.get_or_insert_with(|| { + left_indices + .iter() + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + (batch_idx, row_idx) + }) + .collect() + }); + let arrays: Vec<&dyn Array> = left_arrays_per_batch + .iter() + .map(|batch_keys| batch_keys[key_idx].as_ref()) + .collect(); + compute::interleave(&arrays, il_indices)? + }; + let arr_right = take(right_array.as_ref(), indices_right, None)?; + let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + // AND the result into our mutable bitmap. + // Null positions in eq_result are treated as "not equal": + // first AND with the values, then AND with the validity bitmap. + and_bitmap_with_boolean_buffer(&mut equal_bits, eq_result.values()); + if let Some(eq_nulls) = eq_result.nulls() { + and_bitmap_with_boolean_buffer(&mut equal_bits, eq_nulls.inner()); + } + } + } + + let equal = BooleanArray::new(equal_bits.finish(), None); let filter_builder = FilterBuilder::new(&equal).optimize().build(); let left_filtered = filter_builder.filter(indices_left)?; @@ -1804,6 +1965,263 @@ pub(super) fn equal_rows_arr( )) } +/// AND a `BooleanBuffer` into a `BooleanBufferBuilder` in-place using +/// word-level operations on the underlying byte slices. +fn and_bitmap_with_boolean_buffer( + builder: &mut BooleanBufferBuilder, + rhs: &BooleanBuffer, +) { + let lhs = builder.as_slice_mut(); + let rhs = rhs.inner().as_slice(); + for (l, r) in lhs.iter_mut().zip(rhs.iter()) { + *l &= r; + } +} + +/// Dispatch a macro call by Arrow DataType for element-wise comparison. +/// The `$action` macro is invoked with the concrete array type for each supported type. +/// Returns `false` for unsupported/nested types (caller should use fallback). +macro_rules! dispatch_elementwise { + ($data_type:expr, $equal_bits:expr, $left_indices:expr, $null_equality:expr, $action:ident) => { + match $data_type { + DataType::Null => { + match $null_equality { + NullEquality::NullEqualsNothing => { + for i in 0..$left_indices.len() { + $equal_bits.set_bit(i, false); + } + } + NullEquality::NullEqualsNull => {} + } + } + DataType::Boolean => $action!(BooleanArray), + DataType::Int8 => $action!(Int8Array), + DataType::Int16 => $action!(Int16Array), + DataType::Int32 => $action!(Int32Array), + DataType::Int64 => $action!(Int64Array), + DataType::UInt8 => $action!(UInt8Array), + DataType::UInt16 => $action!(UInt16Array), + DataType::UInt32 => $action!(UInt32Array), + DataType::UInt64 => $action!(UInt64Array), + DataType::Float32 => $action!(Float32Array), + DataType::Float64 => $action!(Float64Array), + DataType::Binary => $action!(BinaryArray), + DataType::BinaryView => $action!(BinaryViewArray), + DataType::FixedSizeBinary(_) => $action!(FixedSizeBinaryArray), + DataType::LargeBinary => $action!(LargeBinaryArray), + DataType::Utf8 => $action!(StringArray), + DataType::Utf8View => $action!(StringViewArray), + DataType::LargeUtf8 => $action!(LargeStringArray), + DataType::Decimal128(..) => $action!(Decimal128Array), + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => $action!(TimestampSecondArray), + TimeUnit::Millisecond => $action!(TimestampMillisecondArray), + TimeUnit::Microsecond => $action!(TimestampMicrosecondArray), + TimeUnit::Nanosecond => $action!(TimestampNanosecondArray), + }, + DataType::Date32 => $action!(Date32Array), + DataType::Date64 => $action!(Date64Array), + _ => return false, + } + }; +} + +/// Compare rows element-wise without materializing intermediate arrays. +/// Returns `true` if the comparison was handled, `false` if fallback is needed. +/// +/// Only works for single-batch builds where packed index == row index. +/// Clears bits in `equal_bits` where the left and right values at the +/// indexed positions are not equal (respecting `null_equality`). +fn compare_rows_elementwise( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_array: &ArrayRef, + right_array: &ArrayRef, + null_equality: NullEquality, +) -> bool { + if left_array.data_type().is_nested() { + return false; + } + + macro_rules! compare_elementwise { + ($array_type:ty) => {{ + let left = left_array.as_any().downcast_ref::<$array_type>().unwrap(); + let right = right_array.as_any().downcast_ref::<$array_type>().unwrap(); + do_compare_elementwise( + equal_bits, + left_indices, + right_indices, + &left, + &right, + null_equality, + ); + }}; + } + + dispatch_elementwise!(left_array.data_type(), equal_bits, left_indices, null_equality, compare_elementwise); + true +} + +/// Inner loop for element-wise comparison. Generic over array type via `ArrayAccessor`. +/// Compares `left.value(left_indices[i] as usize)` against +/// `right.value(right_indices[i])` for each row, clearing bits that don't match. +/// +/// Only valid for single-batch builds where packed index == row index. +fn do_compare_elementwise( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left: &A, + right: &A, + null_equality: NullEquality, +) where + A::Item: PartialEq, +{ + let left_nulls = left.nulls(); + let right_nulls = right.nulls(); + let has_nulls = left.null_count() > 0 || right.null_count() > 0; + + let num_rows = left_indices.len(); + + if !has_nulls { + // Fast path: no nulls, just compare values directly + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let l_idx = left_indices[i] as usize; + let r_idx = right_indices[i] as usize; + if left.value(l_idx) != right.value(r_idx) { + equal_bits.set_bit(i, false); + } + } + } else { + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let l_idx = left_indices[i] as usize; + let r_idx = right_indices[i] as usize; + let l_null = left_nulls.is_some_and(|nulls| !nulls.is_valid(l_idx)); + let r_null = right_nulls.is_some_and(|nulls| !nulls.is_valid(r_idx)); + + let is_equal = match (l_null, r_null) { + (true, true) => null_equality == NullEquality::NullEqualsNull, + (true, false) | (false, true) => false, + (false, false) => left.value(l_idx) == right.value(r_idx), + }; + if !is_equal { + equal_bits.set_bit(i, false); + } + } + } +} + +/// Compare rows element-wise for multi-batch builds without materializing +/// intermediate arrays. Decodes packed indices to (batch_idx, row_idx) and +/// accesses the correct batch array directly. +/// Returns `true` if the comparison was handled, `false` if fallback is needed. +fn compare_rows_elementwise_multi( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_arrays: &[&ArrayRef], + right_array: &ArrayRef, + null_equality: NullEquality, +) -> bool { + if right_array.data_type().is_nested() { + return false; + } + + macro_rules! compare_multi { + ($array_type:ty) => {{ + let left_typed: Vec<&$array_type> = left_arrays + .iter() + .map(|a| a.as_any().downcast_ref::<$array_type>().unwrap()) + .collect(); + let right = right_array.as_any().downcast_ref::<$array_type>().unwrap(); + do_compare_elementwise_multi( + equal_bits, + left_indices, + right_indices, + &left_typed, + &right, + null_equality, + ); + }}; + } + + dispatch_elementwise!(right_array.data_type(), equal_bits, left_indices, null_equality, compare_multi); + true +} + +/// Inner loop for multi-batch element-wise comparison. +/// Decodes packed indices to access the correct batch array per row. +/// +/// Takes `&[A]` where `A` is a reference type like `&Int32Array` that implements +/// `ArrayAccessor`. Null checking works through auto-deref to the `Array` trait. +fn do_compare_elementwise_multi( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_arrays: &[A], + right: &A, + null_equality: NullEquality, +) where + A::Item: PartialEq, +{ + let right_nulls = right.nulls(); + let has_nulls = right.null_count() > 0 + || left_arrays + .iter() + .any(|a| a.null_count() > 0); + let num_rows = left_indices.len(); + + if !has_nulls { + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let packed = left_indices[i]; + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let r_idx = right_indices[i] as usize; + if left_arrays[batch_idx].value(row_idx) != right.value(r_idx) { + equal_bits.set_bit(i, false); + } + } + } else { + // Pre-compute null buffers per batch to avoid repeated method calls in the loop + let left_nulls_per_batch: Vec> = + left_arrays.iter().map(|a| a.nulls()).collect(); + + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let packed = left_indices[i]; + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let r_idx = right_indices[i] as usize; + let l_null = left_nulls_per_batch[batch_idx] + .is_some_and(|n| !n.is_valid(row_idx)); + let r_null = right_nulls.is_some_and(|n| !n.is_valid(r_idx)); + + let is_equal = match (l_null, r_null) { + (true, true) => null_equality == NullEquality::NullEqualsNull, + (true, false) | (false, true) => false, + (false, false) => { + left_arrays[batch_idx].value(row_idx) == right.value(r_idx) + } + }; + if !is_equal { + equal_bits.set_bit(i, false); + } + } + } +} + // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, @@ -2928,7 +3346,7 @@ mod tests { let result = build_batch_empty_build_side( &empty_schema, - &build_batch, + &build_batch.schema(), &probe_batch, &[], // no column indices with empty projection JoinType::Right,