From aa1abae825d7b3b80f7b75fab61424e634431228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 15:02:04 +0100 Subject: [PATCH 01/26] Avoid concat_batches --- .../physical-plan/src/joins/hash_join/exec.rs | 125 ++++++++++++++---- .../src/joins/hash_join/stream.rs | 51 ++++--- .../src/joins/symmetric_hash_join.rs | 12 +- datafusion/physical-plan/src/joins/utils.rs | 104 ++++++++++++--- 4 files changed, 226 insertions(+), 66 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c66123facb627..6aba4c7ae2af8 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,8 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{ArrayRef, BooleanBufferBuilder}; -use arrow::compute::concat_batches; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, new_null_array}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -106,14 +105,14 @@ const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; #[expect(clippy::too_many_arguments)] fn try_create_array_map( bounds: &Option, - schema: &SchemaRef, + _schema: &SchemaRef, batches: &[RecordBatch], on_left: &[PhysicalExprRef], reservation: &mut MemoryReservation, perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, -) -> Result)>> { +) -> Result)>> { if on_left.len() != 1 { return Ok(None); } @@ -178,12 +177,42 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - let batch = concat_batches(schema, batches)?; - let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + // Evaluate key expressions per-batch and concatenate only key columns + let left_values = concat_key_value_arrays(on_left, batches.iter())?; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; - Ok(Some((array_map, batch, left_values))) + Ok(Some((array_map, left_values))) +} + +/// Evaluate key expressions per-batch and concatenate only the resulting key +/// column arrays. This avoids `concat_batches` on the full data (which would +/// duplicate all columns in memory) while still providing contiguous key arrays +/// for `equal_rows_arr`. +fn concat_key_value_arrays<'a>( + on: &[PhysicalExprRef], + batches: impl Iterator, +) -> Result> { + let batches: Vec<&RecordBatch> = batches.collect(); + if batches.is_empty() || on.is_empty() { + return Ok(on.iter().map(|_| new_null_array(&DataType::Null, 0)).collect()); + } + // Evaluate key expressions for each batch + let per_batch_keys: Vec> = batches + .iter() + .map(|batch| evaluate_expressions_to_arrays(on, batch)) + .collect::>>()?; + // Transpose and concatenate: for each key column, concat across all batches + let num_keys = on.len(); + (0..num_keys) + .map(|key_idx| { + let arrays: Vec<&dyn Array> = per_batch_keys + .iter() + .map(|keys| keys[key_idx].as_ref()) + .collect(); + Ok(arrow::compute::concat(&arrays)?) + }) + .collect() } /// HashTable and input data for the left (build side) of a join @@ -191,9 +220,15 @@ pub(super) struct JoinLeftData { /// The hash table with indices into `batch` /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown pub(super) map: Arc, - /// The input rows for the build side - batch: RecordBatch, - /// The build side on expressions values + /// The schema of the build side input + schema: SchemaRef, + /// The input batches for the build side (kept separate to avoid concat_batches) + batches: Vec, + /// Prefix-sum of batch sizes for converting flat indices to (batch_idx, row_idx). + /// batch_offsets[i] is the starting flat index for batches[i]. + /// Has length batches.len() + 1 (last element is total row count). + batch_offsets: Vec, + /// The build side on expressions values (concatenated key columns only) values: Vec, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, @@ -225,9 +260,29 @@ impl JoinLeftData { &self.map } - /// returns a reference to the build side batch - pub(super) fn batch(&self) -> &RecordBatch { - &self.batch + /// Returns a reference to a build-side batch for schema/data-type lookups. + /// If batches exist, returns the first one; otherwise creates an empty batch + /// with the correct build-side schema. + pub(super) fn batch(&self) -> RecordBatch { + self.batches + .first() + .cloned() + .unwrap_or_else(|| RecordBatch::new_empty(Arc::clone(&self.schema))) + } + + /// returns a reference to the build side batches + pub(super) fn batches(&self) -> &[RecordBatch] { + &self.batches + } + + /// returns the batch offsets for flat-to-(batch,row) index conversion + pub(super) fn batch_offsets(&self) -> &[usize] { + &self.batch_offsets + } + + /// returns total number of rows across all build batches + pub(super) fn num_rows(&self) -> usize { + *self.batch_offsets.last().unwrap_or(&0) } /// returns a reference to the build side expressions values @@ -1965,8 +2020,8 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, batch, left_values) = - if let Some((array_map, batch, left_value)) = try_create_array_map( + let (join_hash_map, left_values, should_reverse_batches) = + if let Some((array_map, left_value)) = try_create_array_map( &bounds, &schema, &batches, @@ -1979,7 +2034,7 @@ async fn collect_left_input( array_map_created_count.add(1); metrics.build_mem_used.add(array_map.size()); - (Map::ArrayMap(array_map), batch, left_value) + (Map::ArrayMap(array_map), left_value, false) } else { // Estimation of memory size, required for hashtable, prior to allocation. // Final result can be verified using `RawTable.allocation_info()` @@ -2025,21 +2080,41 @@ async fn collect_left_input( offset += batch.num_rows(); } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter.clone())?; - - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + // Evaluate key expressions per-batch and concatenate only the key columns + // (avoids concat_batches on the full data, saving memory) + let left_values = concat_key_value_arrays(&on_left, batches_iter.clone())?; - (Map::HashMap(hashmap), batch, left_values) + (Map::HashMap(hashmap), left_values, true) }; + // For HashMap: reverse the batches to match the hash map's flat index ordering. + // The hash map is built iterating in reverse (last batch first), so flat index 0 + // corresponds to the first row of the last original batch. By reversing, batches[0] + // is the last original batch, matching the hash map's indexing scheme. + // For ArrayMap: batches are already in the correct (forward) order since the + // ArrayMap is built from forward-ordered concatenated key columns. + let mut batches = batches; + if should_reverse_batches { + batches.reverse(); + } + + // Compute batch_offsets (prefix sum of batch sizes) for flat-to-(batch,row) conversion + let batch_offsets = { + let mut offsets = Vec::with_capacity(batches.len() + 1); + offsets.push(0usize); + for b in &batches { + offsets.push(offsets.last().unwrap() + b.num_rows()); + } + offsets + }; + // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { - let bitmap_size = bit_util::ceil(batch.num_rows(), 8); + let bitmap_size = bit_util::ceil(num_rows, 8); reservation.try_grow(bitmap_size)?; metrics.build_mem_used.add(bitmap_size); - let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows()); + let mut bitmap_buffer = BooleanBufferBuilder::new(num_rows); bitmap_buffer.append_n(num_rows, false); bitmap_buffer } else { @@ -2080,7 +2155,9 @@ async fn collect_left_input( let data = JoinLeftData { map, - batch, + schema: Arc::clone(&schema), + batches, + batch_offsets, values: left_values, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 57218244bae6b..5fb3e01deda0d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -649,7 +649,7 @@ impl HashJoinStream { if is_empty && self.filter.is_none() { let result = build_batch_empty_build_side( &self.schema, - build_side.left_data.batch(), + &build_side.left_data.batch(), &state.batch, &self.column_indices, self.join_type, @@ -706,7 +706,8 @@ impl HashJoinStream { // apply join filter if exists let (left_indices, right_indices) = if let Some(filter) = &self.filter { apply_join_filter_to_indices( - build_side.left_data.batch(), + build_side.left_data.batches(), + build_side.left_data.batch_offsets(), &state.batch, left_indices, right_indices, @@ -767,23 +768,32 @@ impl HashJoinStream { )?; // Build output batch and push to coalescer - let (build_batch, probe_batch, join_side) = - if self.join_type == JoinType::RightMark { - (&state.batch, build_side.left_data.batch(), JoinSide::Right) - } else { - (build_side.left_data.batch(), &state.batch, JoinSide::Left) - }; - - let batch = build_batch_from_indices( - &self.schema, - build_batch, - probe_batch, - &left_indices, - &right_indices, - &self.column_indices, - join_side, - self.join_type, - )?; + let batch = if self.join_type == JoinType::RightMark { + let num_rows = state.batch.num_rows(); + build_batch_from_indices( + &self.schema, + std::slice::from_ref(&state.batch), + &[0, num_rows], + &build_side.left_data.batch(), + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Right, + self.join_type, + )? + } else { + build_batch_from_indices( + &self.schema, + build_side.left_data.batches(), + build_side.left_data.batch_offsets(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + self.join_type, + )? + }; let push_status = self.output_buffer.push_batch(batch)?; @@ -895,7 +905,8 @@ impl HashJoinStream { let empty_right_batch = RecordBatch::new_empty(self.right.schema()); let batch = build_batch_from_indices( &self.schema, - build_side.left_data.batch(), + build_side.left_data.batches(), + build_side.left_data.batch_offsets(), &empty_right_batch, &left_side, &right_side, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 4429d1e3fbe5e..403018a898ae9 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -951,9 +951,11 @@ pub(crate) fn build_side_determined_results( // Create an empty probe record batch: let empty_probe_batch = RecordBatch::new_empty(probe_schema); // Build the final result from the indices of build and probe sides: + let num_rows = build_hash_joiner.input_buffer.num_rows(); build_batch_from_indices( output_schema.as_ref(), - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), + &[0, num_rows], &empty_probe_batch, &build_indices, &probe_indices, @@ -1015,8 +1017,10 @@ pub(crate) fn join_with_probe_batch( )?; let (build_indices, probe_indices) = if let Some(filter) = filter { + let num_rows = build_hash_joiner.input_buffer.num_rows(); apply_join_filter_to_indices( - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), + &[0, num_rows], probe_batch, build_indices, probe_indices, @@ -1054,9 +1058,11 @@ pub(crate) fn join_with_probe_batch( ) { Ok(None) } else { + let num_rows = build_hash_joiner.input_buffer.num_rows(); build_batch_from_indices( schema, - &build_hash_joiner.input_buffer, + std::slice::from_ref(&build_hash_joiner.input_buffer), + &[0, num_rows], probe_batch, &build_indices, &probe_indices, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index cf4bf2cd163fd..557e9d69c3e1d 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -912,7 +912,8 @@ pub(crate) fn get_final_indices_from_bit_map( #[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( - build_input_buffer: &RecordBatch, + build_batches: &[RecordBatch], + build_batch_offsets: &[usize], probe_batch: &RecordBatch, build_indices: UInt64Array, probe_indices: UInt32Array, @@ -934,7 +935,8 @@ pub(crate) fn apply_join_filter_to_indices( let len = end - i; let intermediate_batch = build_batch_from_indices( filter.schema(), - build_input_buffer, + build_batches, + build_batch_offsets, probe_batch, &build_indices.slice(i, len), &probe_indices.slice(i, len), @@ -956,7 +958,8 @@ pub(crate) fn apply_join_filter_to_indices( } else { let intermediate_batch = build_batch_from_indices( filter.schema(), - build_input_buffer, + build_batches, + build_batch_offsets, probe_batch, &build_indices, &probe_indices, @@ -992,12 +995,35 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result Vec<(usize, usize)> { + build_indices + .values() + .iter() + .map(|&flat_idx| { + let flat = flat_idx as usize; + // Binary search to find which batch this flat index belongs to + let batch_idx = batch_offsets.partition_point(|&o| o <= flat) - 1; + let row_idx = flat - batch_offsets[batch_idx]; + (batch_idx, row_idx) + }) + .collect() +} + /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. +/// +/// Build-side columns are gathered from multiple batches using `interleave`, +/// avoiding the need to `concat_batches` the entire build side upfront. #[expect(clippy::too_many_arguments)] pub(crate) fn build_batch_from_indices( schema: &Schema, - build_input_buffer: &RecordBatch, + build_batches: &[RecordBatch], + build_batch_offsets: &[usize], probe_batch: &RecordBatch, build_indices: &UInt64Array, probe_indices: &UInt32Array, @@ -1006,9 +1032,6 @@ pub(crate) fn build_batch_from_indices( join_type: JoinType, ) -> Result { if schema.fields().is_empty() { - // For RightAnti and RightSemi joins, after `adjust_indices_by_join_type` - // the build_indices were untouched so only probe_indices hold the actual - // row count. let row_count = match join_type { JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(), _ => build_indices.len(), @@ -1016,25 +1039,68 @@ pub(crate) fn build_batch_from_indices( return new_empty_schema_batch(schema, row_count); } - // build the columns of the new [RecordBatch]: - // 1. pick whether the column is from the left or right - // 2. based on the pick, `take` items from the different RecordBatches + // Pre-compute interleave indices for build-side columns (shared across all build columns) + let interleave_indices = if build_batches.is_empty() + || build_indices.null_count() == build_indices.len() + { + None + } else { + Some(flat_indices_to_interleave(build_indices, build_batch_offsets)) + }; + let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); for column_index in column_indices { let array = if column_index.side == JoinSide::None { - // For mark joins, the mark column is a true if the indices is not null, otherwise it will be false Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { - let array = build_input_buffer.column(column_index.index); - if array.is_empty() || build_indices.null_count() == build_indices.len() { - // Outer join would generate a null index when finding no match at our side. - // Therefore, it's possible we are empty but need to populate an n-length null array, - // where n is the length of the index array. - assert_eq!(build_indices.null_count(), build_indices.len()); - new_null_array(array.data_type(), build_indices.len()) + if let Some(ref il_indices) = interleave_indices { + // Gather column arrays from all build batches + let arrays: Vec<&dyn Array> = build_batches + .iter() + .map(|b| b.column(column_index.index).as_ref()) + .collect(); + let result = compute::interleave(&arrays, il_indices)?; + // Apply null mask from build_indices (for outer joins where + // unmatched rows are represented as null indices) + if build_indices.null_count() > 0 { + if let Some(idx_nulls) = build_indices.nulls() { + // Combine the existing data nulls with the build_indices nulls + let data = result.to_data(); + let combined_nulls = if let Some(existing) = data.nulls() { + NullBuffer::new( + existing.inner() & idx_nulls.inner(), + ) + } else { + idx_nulls.clone() + }; + arrow::array::make_array( + data.into_builder() + .null_bit_buffer(Some( + combined_nulls.into_inner().into_inner(), + )) + .build()?, + ) + } else { + result + } + } else { + result + } } else { - take(array.as_ref(), build_indices, None)? + // All build indices are null (outer join with no matches) + let data_type = if build_batches.is_empty() { + schema + .field(column_index.index) + .data_type() + .clone() + } else { + build_batches[0] + .column(column_index.index) + .data_type() + .clone() + }; + new_null_array(&data_type, build_indices.len()) } } else { let array = probe_batch.column(column_index.index); From 622d251608ac48464675ec583d02f47ae53a12bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 15:13:31 +0100 Subject: [PATCH 02/26] Fmt --- .../physical-plan/src/joins/hash_join/exec.rs | 5 ++++- datafusion/physical-plan/src/joins/utils.rs | 14 ++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6aba4c7ae2af8..7bd93a30a5fe6 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -195,7 +195,10 @@ fn concat_key_value_arrays<'a>( ) -> Result> { let batches: Vec<&RecordBatch> = batches.collect(); if batches.is_empty() || on.is_empty() { - return Ok(on.iter().map(|_| new_null_array(&DataType::Null, 0)).collect()); + return Ok(on + .iter() + .map(|_| new_null_array(&DataType::Null, 0)) + .collect()); } // Evaluate key expressions for each batch let per_batch_keys: Vec> = batches diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 557e9d69c3e1d..05f40911a17f3 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1045,7 +1045,10 @@ pub(crate) fn build_batch_from_indices( { None } else { - Some(flat_indices_to_interleave(build_indices, build_batch_offsets)) + Some(flat_indices_to_interleave( + build_indices, + build_batch_offsets, + )) }; let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); @@ -1068,9 +1071,7 @@ pub(crate) fn build_batch_from_indices( // Combine the existing data nulls with the build_indices nulls let data = result.to_data(); let combined_nulls = if let Some(existing) = data.nulls() { - NullBuffer::new( - existing.inner() & idx_nulls.inner(), - ) + NullBuffer::new(existing.inner() & idx_nulls.inner()) } else { idx_nulls.clone() }; @@ -1090,10 +1091,7 @@ pub(crate) fn build_batch_from_indices( } else { // All build indices are null (outer join with no matches) let data_type = if build_batches.is_empty() { - schema - .field(column_index.index) - .data_type() - .clone() + schema.field(column_index.index).data_type().clone() } else { build_batches[0] .column(column_index.index) From 75ea2a478535ce858e43dc201f22093fcb360d51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 15:36:09 +0100 Subject: [PATCH 03/26] Avoid interleave for single batch --- datafusion/physical-plan/src/joins/utils.rs | 46 +++++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 05f40911a17f3..7c5e2a7af0b37 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1039,16 +1039,19 @@ pub(crate) fn build_batch_from_indices( return new_empty_schema_batch(schema, row_count); } - // Pre-compute interleave indices for build-side columns (shared across all build columns) - let interleave_indices = if build_batches.is_empty() - || build_indices.null_count() == build_indices.len() + // For a single batch, use take() directly (no index conversion overhead). + // For multiple batches, pre-compute interleave indices. + let use_single_batch = build_batches.len() == 1; + let interleave_indices = if !use_single_batch + && !build_batches.is_empty() + && build_indices.null_count() != build_indices.len() { - None - } else { Some(flat_indices_to_interleave( build_indices, build_batch_offsets, )) + } else { + None }; let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); @@ -1057,8 +1060,25 @@ pub(crate) fn build_batch_from_indices( let array = if column_index.side == JoinSide::None { Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { - if let Some(ref il_indices) = interleave_indices { - // Gather column arrays from all build batches + if build_batches.is_empty() + || build_indices.null_count() == build_indices.len() + { + // All build indices are null (outer join with no matches) + let data_type = if build_batches.is_empty() { + schema.field(column_index.index).data_type().clone() + } else { + build_batches[0] + .column(column_index.index) + .data_type() + .clone() + }; + new_null_array(&data_type, build_indices.len()) + } else if use_single_batch { + // Fast path: single batch, use take() directly (no conversion) + let array = build_batches[0].column(column_index.index); + take(array.as_ref(), build_indices, None)? + } else if let Some(ref il_indices) = interleave_indices { + // Multi-batch path: gather from all build batches via interleave let arrays: Vec<&dyn Array> = build_batches .iter() .map(|b| b.column(column_index.index).as_ref()) @@ -1068,7 +1088,6 @@ pub(crate) fn build_batch_from_indices( // unmatched rows are represented as null indices) if build_indices.null_count() > 0 { if let Some(idx_nulls) = build_indices.nulls() { - // Combine the existing data nulls with the build_indices nulls let data = result.to_data(); let combined_nulls = if let Some(existing) = data.nulls() { NullBuffer::new(existing.inner() & idx_nulls.inner()) @@ -1089,16 +1108,7 @@ pub(crate) fn build_batch_from_indices( result } } else { - // All build indices are null (outer join with no matches) - let data_type = if build_batches.is_empty() { - schema.field(column_index.index).data_type().clone() - } else { - build_batches[0] - .column(column_index.index) - .data_type() - .clone() - }; - new_null_array(&data_type, build_indices.len()) + unreachable!("interleave_indices should be Some for multi-batch non-null case") } } else { let array = probe_batch.column(column_index.index); From d99cf539c61c65e386744cda010b5bc080dd2ade Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 15:38:37 +0100 Subject: [PATCH 04/26] Avoid interleave for single batch --- datafusion/physical-plan/src/joins/utils.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 7c5e2a7af0b37..d4464d3b3a161 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1108,7 +1108,9 @@ pub(crate) fn build_batch_from_indices( result } } else { - unreachable!("interleave_indices should be Some for multi-batch non-null case") + unreachable!( + "interleave_indices should be Some for multi-batch non-null case" + ) } } else { let array = probe_batch.column(column_index.index); From 21eb596980e58e83d8f5656151155ca4b86a8979 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 16:18:22 +0100 Subject: [PATCH 05/26] WIP --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 7bd93a30a5fe6..f83997acbd044 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -283,11 +283,6 @@ impl JoinLeftData { &self.batch_offsets } - /// returns total number of rows across all build batches - pub(super) fn num_rows(&self) -> usize { - *self.batch_offsets.last().unwrap_or(&0) - } - /// returns a reference to the build side expressions values pub(super) fn values(&self) -> &[ArrayRef] { &self.values From b808be6d097c0922c3465990d561c0c620b43aee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 17:03:06 +0100 Subject: [PATCH 06/26] WIP --- .../physical-plan/src/joins/hash_join/exec.rs | 241 ++++++++++-------- .../src/joins/hash_join/stream.rs | 53 ++-- .../src/joins/symmetric_hash_join.rs | 9 +- datafusion/physical-plan/src/joins/utils.rs | 146 ++++++----- 4 files changed, 261 insertions(+), 188 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index f83997acbd044..51309baca2327 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, new_null_array}; +use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, UInt64Array, new_null_array}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -177,43 +177,50 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - // Evaluate key expressions per-batch and concatenate only key columns - let left_values = concat_key_value_arrays(on_left, batches.iter())?; + // Evaluate key expressions per-batch and concatenate for ArrayMap construction + let per_batch_keys: Vec> = batches + .iter() + .map(|batch| evaluate_expressions_to_arrays(on_left, batch)) + .collect::>>()?; + let left_values: Vec = if per_batch_keys.is_empty() || on_left.is_empty() { + on_left + .iter() + .map(|_| new_null_array(&DataType::Null, 0)) + .collect() + } else { + let num_keys = on_left.len(); + (0..num_keys) + .map(|key_idx| { + let arrays: Vec<&dyn Array> = per_batch_keys + .iter() + .map(|keys| keys[key_idx].as_ref()) + .collect(); + Ok(arrow::compute::concat(&arrays)?) + }) + .collect::>>()? + }; let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; Ok(Some((array_map, left_values))) } -/// Evaluate key expressions per-batch and concatenate only the resulting key -/// column arrays. This avoids `concat_batches` on the full data (which would -/// duplicate all columns in memory) while still providing contiguous key arrays -/// for `equal_rows_arr`. -fn concat_key_value_arrays<'a>( - on: &[PhysicalExprRef], - batches: impl Iterator, -) -> Result> { - let batches: Vec<&RecordBatch> = batches.collect(); - if batches.is_empty() || on.is_empty() { - return Ok(on - .iter() - .map(|_| new_null_array(&DataType::Null, 0)) - .collect()); - } - // Evaluate key expressions for each batch - let per_batch_keys: Vec> = batches +/// Convert flat indices (used in the visited bitmap) to packed composite indices +/// `(batch_idx << 32) | row_idx` used by `build_batch_from_indices`. +pub(super) fn flat_to_packed_indices( + flat_indices: &UInt64Array, + batch_offsets: &[usize], +) -> UInt64Array { + flat_indices + .values() .iter() - .map(|batch| evaluate_expressions_to_arrays(on, batch)) - .collect::>>()?; - // Transpose and concatenate: for each key column, concat across all batches - let num_keys = on.len(); - (0..num_keys) - .map(|key_idx| { - let arrays: Vec<&dyn Array> = per_batch_keys - .iter() - .map(|keys| keys[key_idx].as_ref()) - .collect(); - Ok(arrow::compute::concat(&arrays)?) + .map(|&flat_val| { + let flat = flat_val as usize; + let batch_idx = batch_offsets + .partition_point(|&o| o <= flat) + .saturating_sub(1); + let row_idx = flat - batch_offsets[batch_idx]; + ((batch_idx as u64) << 32) | (row_idx as u64) }) .collect() } @@ -231,8 +238,9 @@ pub(super) struct JoinLeftData { /// batch_offsets[i] is the starting flat index for batches[i]. /// Has length batches.len() + 1 (last element is total row count). batch_offsets: Vec, - /// The build side on expressions values (concatenated key columns only) - values: Vec, + /// The build side key expression values, per batch. + /// values_per_batch[batch_idx][key_idx] is the key column array for that batch. + values_per_batch: Vec>, /// Shared bitmap builder for visited left indices visited_indices_bitmap: SharedBitmapBuilder, /// Counter of running probe-threads, potentially @@ -283,9 +291,9 @@ impl JoinLeftData { &self.batch_offsets } - /// returns a reference to the build side expressions values - pub(super) fn values(&self) -> &[ArrayRef] { - &self.values + /// returns a reference to the build side key expressions values, per batch + pub(super) fn values_per_batch(&self) -> &[Vec] { + &self.values_per_batch } /// returns a reference to the visited indices bitmap @@ -2018,8 +2026,8 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, left_values, should_reverse_batches) = - if let Some((array_map, left_value)) = try_create_array_map( + let (join_hash_map, should_reverse_batches) = if let Some((array_map, _left_value)) = + try_create_array_map( &bounds, &schema, &batches, @@ -2029,61 +2037,57 @@ async fn collect_left_input( config.execution.perfect_hash_join_min_key_density, null_equality, )? { - array_map_created_count.add(1); - metrics.build_mem_used.add(array_map.size()); + array_map_created_count.add(1); + metrics.build_mem_used.add(array_map.size()); - (Map::ArrayMap(array_map), left_value, false) + (Map::ArrayMap(array_map), false) + } else { + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) } else { - // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the - // `u64` indice variant - // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) - }; - - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - let batches_iter = batches.iter().rev(); - - // Updating hashmap starting from the last batch - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( - &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - )?; - offset += batch.num_rows(); - } + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; - // Evaluate key expressions per-batch and concatenate only the key columns - // (avoids concat_batches on the full data, saving memory) - let left_values = concat_key_value_arrays(&on_left, batches_iter.clone())?; + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + let batches_iter = batches.iter().rev(); + + // Updating hashmap starting from the last batch + for batch in batches_iter.clone() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + batch, + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + )?; + offset += batch.num_rows(); + } - (Map::HashMap(hashmap), left_values, true) - }; + (Map::HashMap(hashmap), true) + }; // For HashMap: reverse the batches to match the hash map's flat index ordering. // The hash map is built iterating in reverse (last batch first), so flat index 0 @@ -2096,7 +2100,27 @@ async fn collect_left_input( batches.reverse(); } - // Compute batch_offsets (prefix sum of batch sizes) for flat-to-(batch,row) conversion + // Evaluate key expressions per-batch (no concatenation needed for equal_rows_arr) + let values_per_batch: Vec> = if batches.is_empty() { + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); + let empty_keys = on_left + .iter() + .map(|c| c.evaluate(&empty_batch)?.into_array(0)) + .collect::>>()?; + vec![empty_keys] + } else { + batches + .iter() + .map(|batch| { + on_left + .iter() + .map(|c| c.evaluate(batch)?.into_array(batch.num_rows())) + .collect::>>() + }) + .collect::>>()? + }; + + // Compute batch_offsets (prefix sum of batch sizes) for flat-to-packed index conversion let batch_offsets = { let mut offsets = Vec::with_capacity(batches.len() + 1); offsets.push(0usize); @@ -2124,15 +2148,30 @@ async fn collect_left_input( let membership = if num_rows == 0 { PushdownStrategy::Empty } else { - // If the build side is small enough we can use IN list pushdown. - // If it's too big we fall back to pushing down a reference to the hash table. - // See `PushdownStrategy` for more details. - let estimated_size = left_values + // For membership testing, we need concatenated key columns. + // Concat from the per-batch values. + let concat_values: Vec = + if values_per_batch.is_empty() || values_per_batch[0].is_empty() { + vec![] + } else { + let num_keys = values_per_batch[0].len(); + (0..num_keys) + .map(|key_idx| { + let arrays: Vec<&dyn Array> = values_per_batch + .iter() + .map(|keys| keys[key_idx].as_ref()) + .collect(); + Ok(arrow::compute::concat(&arrays)?) + }) + .collect::>>()? + }; + + let estimated_size = concat_values .iter() .map(|arr| arr.get_array_memory_size()) .sum::(); - if left_values.is_empty() - || left_values[0].is_empty() + if concat_values.is_empty() + || concat_values[0].is_empty() || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size || map.num_of_distinct_key() > config @@ -2140,7 +2179,7 @@ async fn collect_left_input( .hash_join_inlist_pushdown_max_distinct_values { PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { + } else if let Some(in_list_values) = build_struct_inlist_values(&concat_values)? { PushdownStrategy::InList(in_list_values) } else { PushdownStrategy::Map(Arc::clone(&map)) @@ -2156,7 +2195,7 @@ async fn collect_left_input( schema: Arc::clone(&schema), batches, batch_offsets, - values: left_values, + values_per_batch, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), probe_threads_counter: AtomicUsize::new(probe_threads_count), _reservation: reservation, @@ -4416,7 +4455,8 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], + &[0, left.num_rows()], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, @@ -4477,7 +4517,8 @@ mod tests { let mut build_indices_buffer = Vec::new(); let (l, r, _) = lookup_join_hashmap( &join_hash_map, - &[left_keys_values], + &[vec![left_keys_values]], + &[0, left.num_rows()], &[right_keys_values], NullEquality::NullEqualsNothing, &hashes_buffer, diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 5fb3e01deda0d..28a0c4adddf29 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -285,7 +285,8 @@ impl RecordBatchStream for HashJoinStream { #[expect(clippy::too_many_arguments)] pub(super) fn lookup_join_hashmap( build_hashmap: &dyn JoinHashMapType, - build_side_values: &[ArrayRef], + build_side_values_per_batch: &[Vec], + batch_offsets: &[usize], probe_side_values: &[ArrayRef], null_equality: NullEquality, hashes_buffer: &[u64], @@ -302,8 +303,11 @@ pub(super) fn lookup_join_hashmap( build_indices_buffer, ); + // Convert flat indices from hash map to packed composite indices let build_indices_unfiltered: UInt64Array = std::mem::take(build_indices_buffer).into(); + let build_indices_unfiltered = + super::exec::flat_to_packed_indices(&build_indices_unfiltered, batch_offsets); let probe_indices_unfiltered: UInt32Array = std::mem::take(probe_indices_buffer).into(); @@ -312,7 +316,7 @@ pub(super) fn lookup_join_hashmap( let (build_indices, probe_indices) = equal_rows_arr( &build_indices_unfiltered, &probe_indices_unfiltered, - build_side_values, + build_side_values_per_batch, probe_side_values, null_equality, )?; @@ -666,7 +670,8 @@ impl HashJoinStream { { Map::HashMap(map) => lookup_join_hashmap( map.as_ref(), - build_side.left_data.values(), + build_side.left_data.values_per_batch(), + build_side.left_data.batch_offsets(), &state.values, self.null_equality, &self.hashes_buffer, @@ -683,8 +688,15 @@ impl HashJoinStream { &mut self.probe_indices_buffer, &mut self.build_indices_buffer, )?; + // ArrayMap returns flat indices; convert to packed composite indices + let flat_build_indices: UInt64Array = + self.build_indices_buffer.clone().into(); + let packed_build_indices = super::exec::flat_to_packed_indices( + &flat_build_indices, + build_side.left_data.batch_offsets(), + ); ( - UInt64Array::from(self.build_indices_buffer.clone()), + packed_build_indices, UInt32Array::from(self.probe_indices_buffer.clone()), next_offset, ) @@ -707,7 +719,6 @@ impl HashJoinStream { let (left_indices, right_indices) = if let Some(filter) = &self.filter { apply_join_filter_to_indices( build_side.left_data.batches(), - build_side.left_data.batch_offsets(), &state.batch, left_indices, right_indices, @@ -721,10 +732,16 @@ impl HashJoinStream { }; // mark joined left-side indices as visited, if required by join type + // Convert packed indices to flat indices for the bitmap if need_produce_result_in_final(self.join_type) { + let batch_offsets = build_side.left_data.batch_offsets(); let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); left_indices.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); + let packed = x as u64; + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let flat = batch_offsets[batch_idx] + row_idx; + bitmap.set_bit(flat, true); }); } @@ -769,11 +786,9 @@ impl HashJoinStream { // Build output batch and push to coalescer let batch = if self.join_type == JoinType::RightMark { - let num_rows = state.batch.num_rows(); build_batch_from_indices( &self.schema, std::slice::from_ref(&state.batch), - &[0, num_rows], &build_side.left_data.batch(), &left_indices, &right_indices, @@ -785,7 +800,6 @@ impl HashJoinStream { build_batch_from_indices( &self.schema, build_side.left_data.batches(), - build_side.left_data.batch_offsets(), &state.batch, &left_indices, &right_indices, @@ -852,11 +866,16 @@ impl HashJoinStream { } // use the global left bitmap to produce the left indices and right indices - let (mut left_side, mut right_side) = get_final_indices_from_shared_bitmap( + // The bitmap uses flat indices; convert them to packed composite indices + // for build_batch_from_indices. + let (flat_left_side, mut right_side) = get_final_indices_from_shared_bitmap( build_side.left_data.visited_indices_bitmap(), self.join_type, true, ); + let batch_offsets = build_side.left_data.batch_offsets(); + let mut left_side = + super::exec::flat_to_packed_indices(&flat_left_side, batch_offsets); // For null-aware anti join, filter out LEFT rows with NULL in join keys // BUT only if the probe side (RIGHT) was non-empty. If probe side is empty, @@ -870,17 +889,20 @@ impl HashJoinStream { .load(Ordering::Relaxed) { // Since null_aware validation ensures single column join, we only check the first column - let build_key_column = &build_side.left_data.values()[0]; + let values_per_batch = build_side.left_data.values_per_batch(); // Filter out indices where the key is NULL + // left_side contains packed indices: (batch_idx << 32) | row_idx let filtered_indices: Vec = left_side .iter() - .filter_map(|idx| { - let idx_usize = idx.unwrap() as usize; - if build_key_column.is_null(idx_usize) { + .filter_map(|idx: Option| { + let packed = idx.unwrap(); + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + if values_per_batch[batch_idx][0].is_null(row_idx) { None // Skip rows with NULL keys } else { - Some(idx.unwrap()) + Some(packed) } }) .collect(); @@ -906,7 +928,6 @@ impl HashJoinStream { let batch = build_batch_from_indices( &self.schema, build_side.left_data.batches(), - build_side.left_data.batch_offsets(), &empty_right_batch, &left_side, &right_side, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 403018a898ae9..b8adc77e771db 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -951,11 +951,9 @@ pub(crate) fn build_side_determined_results( // Create an empty probe record batch: let empty_probe_batch = RecordBatch::new_empty(probe_schema); // Build the final result from the indices of build and probe sides: - let num_rows = build_hash_joiner.input_buffer.num_rows(); build_batch_from_indices( output_schema.as_ref(), std::slice::from_ref(&build_hash_joiner.input_buffer), - &[0, num_rows], &empty_probe_batch, &build_indices, &probe_indices, @@ -1017,10 +1015,8 @@ pub(crate) fn join_with_probe_batch( )?; let (build_indices, probe_indices) = if let Some(filter) = filter { - let num_rows = build_hash_joiner.input_buffer.num_rows(); apply_join_filter_to_indices( std::slice::from_ref(&build_hash_joiner.input_buffer), - &[0, num_rows], probe_batch, build_indices, probe_indices, @@ -1058,11 +1054,9 @@ pub(crate) fn join_with_probe_batch( ) { Ok(None) } else { - let num_rows = build_hash_joiner.input_buffer.num_rows(); build_batch_from_indices( schema, std::slice::from_ref(&build_hash_joiner.input_buffer), - &[0, num_rows], probe_batch, &build_indices, &probe_indices, @@ -1153,10 +1147,11 @@ fn lookup_join_hashmap( let build_indices: UInt64Array = matched_build.into(); let probe_indices: UInt32Array = matched_probe.into(); + let build_join_values_per_batch = [build_join_values]; let (build_indices, probe_indices) = equal_rows_arr( &build_indices, &probe_indices, - &build_join_values, + &build_join_values_per_batch, &keys_values, null_equality, )?; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index d4464d3b3a161..42f1384508714 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -910,10 +910,8 @@ pub(crate) fn get_final_indices_from_bit_map( (left_indices, right_indices) } -#[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( build_batches: &[RecordBatch], - build_batch_offsets: &[usize], probe_batch: &RecordBatch, build_indices: UInt64Array, probe_indices: UInt32Array, @@ -936,7 +934,6 @@ pub(crate) fn apply_join_filter_to_indices( let intermediate_batch = build_batch_from_indices( filter.schema(), build_batches, - build_batch_offsets, probe_batch, &build_indices.slice(i, len), &probe_indices.slice(i, len), @@ -959,7 +956,6 @@ pub(crate) fn apply_join_filter_to_indices( let intermediate_batch = build_batch_from_indices( filter.schema(), build_batches, - build_batch_offsets, probe_batch, &build_indices, &probe_indices, @@ -995,20 +991,17 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result Vec<(usize, usize)> { +/// Decode packed composite indices into (batch_index, row_index) pairs. +/// +/// Each u64 value encodes `(batch_idx << 32) | row_idx`. +/// For single-batch cases (batch_idx == 0), the packed value equals the row index. +fn packed_indices_to_interleave(build_indices: &UInt64Array) -> Vec<(usize, usize)> { build_indices .values() .iter() - .map(|&flat_idx| { - let flat = flat_idx as usize; - // Binary search to find which batch this flat index belongs to - let batch_idx = batch_offsets.partition_point(|&o| o <= flat) - 1; - let row_idx = flat - batch_offsets[batch_idx]; + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; (batch_idx, row_idx) }) .collect() @@ -1017,13 +1010,13 @@ fn flat_indices_to_interleave( /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. /// The resulting batch has [Schema] `schema`. /// +/// Build-side indices are packed composite keys: `(batch_idx << 32) | row_idx`. /// Build-side columns are gathered from multiple batches using `interleave`, /// avoiding the need to `concat_batches` the entire build side upfront. #[expect(clippy::too_many_arguments)] pub(crate) fn build_batch_from_indices( schema: &Schema, build_batches: &[RecordBatch], - build_batch_offsets: &[usize], probe_batch: &RecordBatch, build_indices: &UInt64Array, probe_indices: &UInt32Array, @@ -1039,19 +1032,13 @@ pub(crate) fn build_batch_from_indices( return new_empty_schema_batch(schema, row_count); } - // For a single batch, use take() directly (no index conversion overhead). - // For multiple batches, pre-compute interleave indices. - let use_single_batch = build_batches.len() == 1; - let interleave_indices = if !use_single_batch - && !build_batches.is_empty() - && build_indices.null_count() != build_indices.len() + // Pre-compute interleave indices for build-side columns (shared across all build columns) + let interleave_indices = if build_batches.is_empty() + || build_indices.null_count() == build_indices.len() { - Some(flat_indices_to_interleave( - build_indices, - build_batch_offsets, - )) - } else { None + } else { + Some(packed_indices_to_interleave(build_indices)) }; let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); @@ -1060,25 +1047,8 @@ pub(crate) fn build_batch_from_indices( let array = if column_index.side == JoinSide::None { Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { - if build_batches.is_empty() - || build_indices.null_count() == build_indices.len() - { - // All build indices are null (outer join with no matches) - let data_type = if build_batches.is_empty() { - schema.field(column_index.index).data_type().clone() - } else { - build_batches[0] - .column(column_index.index) - .data_type() - .clone() - }; - new_null_array(&data_type, build_indices.len()) - } else if use_single_batch { - // Fast path: single batch, use take() directly (no conversion) - let array = build_batches[0].column(column_index.index); - take(array.as_ref(), build_indices, None)? - } else if let Some(ref il_indices) = interleave_indices { - // Multi-batch path: gather from all build batches via interleave + if let Some(ref il_indices) = interleave_indices { + // Gather column arrays from all build batches let arrays: Vec<&dyn Array> = build_batches .iter() .map(|b| b.column(column_index.index).as_ref()) @@ -1108,9 +1078,16 @@ pub(crate) fn build_batch_from_indices( result } } else { - unreachable!( - "interleave_indices should be Some for multi-batch non-null case" - ) + // All build indices are null (outer join with no matches) + let data_type = if build_batches.is_empty() { + schema.field(column_index.index).data_type().clone() + } else { + build_batches[0] + .column(column_index.index) + .data_type() + .clone() + }; + new_null_array(&data_type, build_indices.len()) } } else { let array = probe_batch.column(column_index.index); @@ -1840,34 +1817,73 @@ pub fn update_hash( Ok(()) } +/// Compares build-side and probe-side key columns for equality, filtering +/// out non-matching rows after hash collision resolution. +/// +/// Build-side indices are packed composite keys: `(batch_idx << 32) | row_idx`. +/// `left_arrays_per_batch[batch_idx][key_idx]` holds key columns per batch. +/// +/// For single-batch builds (the common case), uses `take` directly since +/// batch_idx == 0 means packed values equal row indices. For multi-batch +/// builds, uses `interleave` to gather from the correct batches. pub(super) fn equal_rows_arr( indices_left: &UInt64Array, indices_right: &UInt32Array, - left_arrays: &[ArrayRef], + left_arrays_per_batch: &[Vec], right_arrays: &[ArrayRef], null_equality: NullEquality, ) -> Result<(UInt64Array, UInt32Array)> { - let mut iter = left_arrays.iter().zip(right_arrays.iter()); + if left_arrays_per_batch.is_empty() || right_arrays.is_empty() { + return Err(DataFusionError::Internal( + "At least one array should be provided for both left and right".to_string(), + )); + } - let Some((first_left, first_right)) = iter.next() else { - return Ok((Vec::::new().into(), Vec::::new().into())); + let num_keys = right_arrays.len(); + + // Gather left-side key values using packed indices + let gather_left_key = |key_idx: usize| -> Result { + if left_arrays_per_batch.len() == 1 { + // Single batch: packed value == row index (batch_idx is 0), + // so we can use take directly with UInt32 indices. + let row_indices: UInt32Array = + indices_left.values().iter().map(|&v| v as u32).collect(); + Ok(take( + left_arrays_per_batch[0][key_idx].as_ref(), + &row_indices, + None, + )?) + } else { + // Multiple batches: decode packed indices and use interleave + let arrays: Vec<&dyn Array> = left_arrays_per_batch + .iter() + .map(|batch_keys| batch_keys[key_idx].as_ref()) + .collect(); + let il_indices: Vec<(usize, usize)> = indices_left + .values() + .iter() + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + (batch_idx, row_idx) + }) + .collect(); + Ok(compute::interleave(&arrays, &il_indices)?) + } }; - let arr_left = take(first_left.as_ref(), indices_left, None)?; - let arr_right = take(first_right.as_ref(), indices_right, None)?; + let arr_left = gather_left_key(0)?; + let arr_right = take(right_arrays[0].as_ref(), indices_right, None)?; let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - // Use map and try_fold to iterate over the remaining pairs of arrays. - // In each iteration, take is used on the pair of arrays and their equality is determined. - // The results are then folded (combined) using the and function to get a final equality result. - equal = iter - .map(|(left, right)| { - let arr_left = take(left.as_ref(), indices_left, None)?; - let arr_right = take(right.as_ref(), indices_right, None)?; - eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality) - }) - .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?; + for key_idx in 1..num_keys { + let arr_left = gather_left_key(key_idx)?; + let arr_right = take(right_arrays[key_idx].as_ref(), indices_right, None)?; + let eq_result = + eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; + equal = and(&equal, &eq_result)?; + } let filter_builder = FilterBuilder::new(&equal).optimize().build(); From 627522304f5844d693e61e8ddf0939f23c6a5d17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 17:12:17 +0100 Subject: [PATCH 07/26] WIP --- datafusion/physical-plan/src/joins/utils.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 42f1384508714..661b5820899e5 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1079,14 +1079,12 @@ pub(crate) fn build_batch_from_indices( } } else { // All build indices are null (outer join with no matches) - let data_type = if build_batches.is_empty() { - schema.field(column_index.index).data_type().clone() - } else { - build_batches[0] - .column(column_index.index) - .data_type() - .clone() - }; + // Use the build batch schema to get the correct data type + // (column_index.index refers to the build batch columns, not the output schema) + let data_type = build_batches + .first() + .map(|b| b.column(column_index.index).data_type().clone()) + .unwrap_or_else(|| schema.field(column_index.index).data_type().clone()); new_null_array(&data_type, build_indices.len()) } } else { From 3edef0ae03bfd0b975ed916295d9436313a8f751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 17:18:48 +0100 Subject: [PATCH 08/26] WIP --- datafusion/physical-plan/src/joins/utils.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 661b5820899e5..a43696921cea5 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1043,7 +1043,7 @@ pub(crate) fn build_batch_from_indices( let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); - for column_index in column_indices { + for (output_idx, column_index) in column_indices.iter().enumerate() { let array = if column_index.side == JoinSide::None { Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { @@ -1084,7 +1084,7 @@ pub(crate) fn build_batch_from_indices( let data_type = build_batches .first() .map(|b| b.column(column_index.index).data_type().clone()) - .unwrap_or_else(|| schema.field(column_index.index).data_type().clone()); + .unwrap_or_else(|| schema.field(output_idx).data_type().clone()); new_null_array(&data_type, build_indices.len()) } } else { From f715910f03bb9064266aa0b62a4f0bd39a56c4de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 18:44:40 +0100 Subject: [PATCH 09/26] WIP --- datafusion/physical-plan/src/joins/utils.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a43696921cea5..e5b20dbe248ab 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1084,7 +1084,9 @@ pub(crate) fn build_batch_from_indices( let data_type = build_batches .first() .map(|b| b.column(column_index.index).data_type().clone()) - .unwrap_or_else(|| schema.field(output_idx).data_type().clone()); + .unwrap_or_else(|| { + schema.field(columns.len()).data_type().clone() + }); new_null_array(&data_type, build_indices.len()) } } else { From 22b11e9ad0a1918f4fe6cd8b202438c549415a0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 18:56:12 +0100 Subject: [PATCH 10/26] WIP --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 12 +++++++++--- datafusion/physical-plan/src/joins/utils.rs | 6 ++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 51309baca2327..0b4bd0ad407fa 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2100,12 +2100,18 @@ async fn collect_left_input( batches.reverse(); } + // Ensure there is always at least one batch for schema/data-type lookups + // (e.g. build_batch_from_indices needs the build-side data types even when + // there are no build-side rows, for producing null arrays in outer joins). + if batches.is_empty() { + batches.push(RecordBatch::new_empty(Arc::clone(&schema))); + } + // Evaluate key expressions per-batch (no concatenation needed for equal_rows_arr) - let values_per_batch: Vec> = if batches.is_empty() { - let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); + let values_per_batch: Vec> = if batches[0].num_rows() == 0 { let empty_keys = on_left .iter() - .map(|c| c.evaluate(&empty_batch)?.into_array(0)) + .map(|c| c.evaluate(&batches[0])?.into_array(0)) .collect::>>()?; vec![empty_keys] } else { diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e5b20dbe248ab..84459a3f9e55c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1043,7 +1043,7 @@ pub(crate) fn build_batch_from_indices( let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); - for (output_idx, column_index) in column_indices.iter().enumerate() { + for column_index in column_indices { let array = if column_index.side == JoinSide::None { Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { @@ -1084,9 +1084,7 @@ pub(crate) fn build_batch_from_indices( let data_type = build_batches .first() .map(|b| b.column(column_index.index).data_type().clone()) - .unwrap_or_else(|| { - schema.field(columns.len()).data_type().clone() - }); + .unwrap_or_else(|| schema.field(columns.len()).data_type().clone()); new_null_array(&data_type, build_indices.len()) } } else { From 78196a75d3999d0fcdcbef0148f45fbaaa600963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 18:56:35 +0100 Subject: [PATCH 11/26] WIP --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 0b4bd0ad407fa..e86762dd593a9 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2108,7 +2108,7 @@ async fn collect_left_input( } // Evaluate key expressions per-batch (no concatenation needed for equal_rows_arr) - let values_per_batch: Vec> = if batches[0].num_rows() == 0 { + let values_per_batch: Vec> = if num_rows == 0 { let empty_keys = on_left .iter() .map(|c| c.evaluate(&batches[0])?.into_array(0)) From 6f98d2530bac492870c69a5135bbd21eba30f4e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 20:11:49 +0100 Subject: [PATCH 12/26] WIP --- datafusion/physical-plan/src/coalesce/mod.rs | 3 +++ .../physical-plan/src/joins/hash_join/exec.rs | 16 +++++++--------- .../physical-plan/src/joins/nested_loop_join.rs | 4 +++- .../joins/piecewise_merge_join/classic_join.rs | 2 +- .../src/joins/sort_merge_join/stream.rs | 6 +++--- datafusion/physical-plan/src/joins/utils.rs | 11 +++++++---- 6 files changed, 24 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index ea1a87d091481..1c8508ecba1f3 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -59,6 +59,9 @@ impl LimitedBatchCoalescer { target_batch_size: usize, fetch: Option, ) -> Self { + // Use at least 8 to avoid debug_assert in arrow-select's BatchCoalescer + // where Vec::reserve(n) for small n can allocate more capacity than n + let target_batch_size = target_batch_size.max(8); Self { inner: BatchCoalescer::new(schema, target_batch_size) .with_biggest_coalesce_batch_size(Some(target_batch_size / 2)), diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index e86762dd593a9..33089a6f96349 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -2099,19 +2099,17 @@ async fn collect_left_input( if should_reverse_batches { batches.reverse(); } - - // Ensure there is always at least one batch for schema/data-type lookups - // (e.g. build_batch_from_indices needs the build-side data types even when - // there are no build-side rows, for producing null arrays in outer joins). - if batches.is_empty() { - batches.push(RecordBatch::new_empty(Arc::clone(&schema))); - } + // Remove empty batches to avoid ambiguity in flat_to_packed_indices: + // duplicate offsets from empty batches cause partition_point to return + // incorrect batch indices. + batches.retain(|b| b.num_rows() > 0); // Evaluate key expressions per-batch (no concatenation needed for equal_rows_arr) - let values_per_batch: Vec> = if num_rows == 0 { + let values_per_batch: Vec> = if batches.is_empty() { + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); let empty_keys = on_left .iter() - .map(|c| c.evaluate(&batches[0])?.into_array(0)) + .map(|c| c.evaluate(&empty_batch)?.into_array(0)) .collect::>>()?; vec![empty_keys] } else { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index f84cb54dac948..01b48f94c2c90 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1147,7 +1147,9 @@ impl NestedLoopJoinStream { left_data, metrics, buffered_left_data: None, - output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)), + // Use at least 8 to avoid debug_assert in arrow-select's BatchCoalescer + // where Vec::reserve(n) for small n can allocate more capacity than n + output_buffer: Box::new(BatchCoalescer::new(schema, batch_size.max(8))), batch_size, current_right_batch: None, current_right_batch_matched: None, diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index bb32a222de962..5d07a91673d40 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -416,7 +416,7 @@ struct BatchProcessState { impl BatchProcessState { pub(crate) fn new(schema: Arc, batch_size: usize) -> Self { Self { - output_batches: Box::new(BatchCoalescer::new(schema, batch_size)), + output_batches: Box::new(BatchCoalescer::new(schema, batch_size.max(8))), unmatched_indices: PrimitiveBuilder::new(), start_buffer_idx: 0, start_stream_idx: 0, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 4dcbe1f647990..94650301b9461 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -402,7 +402,7 @@ impl JoinedRecordBatches { /// Clears batches without touching metadata (for early return when no filtering needed) fn clear_batches(&mut self, schema: &SchemaRef, batch_size: usize) { - self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size) + self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)); } @@ -513,7 +513,7 @@ impl JoinedRecordBatches { } fn clear(&mut self, schema: &SchemaRef, batch_size: usize) { - self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size) + self.joined_batches = BatchCoalescer::new(Arc::clone(schema), batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)); self.filter_metadata = FilterMetadata::new(); self.debug_assert_empty_consistency(); @@ -785,7 +785,7 @@ impl SortMergeJoinStream { .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)), filter_metadata: FilterMetadata::new(), }, - output: BatchCoalescer::new(schema, batch_size) + output: BatchCoalescer::new(schema, batch_size.max(8)) .with_biggest_coalesce_batch_size(Option::from(batch_size / 2)), batch_size, join_type, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 84459a3f9e55c..e5002ccac1cf6 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1081,10 +1081,13 @@ pub(crate) fn build_batch_from_indices( // All build indices are null (outer join with no matches) // Use the build batch schema to get the correct data type // (column_index.index refers to the build batch columns, not the output schema) - let data_type = build_batches - .first() - .map(|b| b.column(column_index.index).data_type().clone()) - .unwrap_or_else(|| schema.field(columns.len()).data_type().clone()); + let data_type = if let Some(b) = build_batches.first() { + b.column(column_index.index).data_type().clone() + } else { + // No build batches available (empty build side in CollectLeft mode). + // Use the output schema field at the current column position. + schema.field(columns.len()).data_type().clone() + }; new_null_array(&data_type, build_indices.len()) } } else { From c5ed684bf813617ac57ac9e3dd2f5852384846b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 22:34:17 +0100 Subject: [PATCH 13/26] WIP --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 01b48f94c2c90..f84cb54dac948 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1147,9 +1147,7 @@ impl NestedLoopJoinStream { left_data, metrics, buffered_left_data: None, - // Use at least 8 to avoid debug_assert in arrow-select's BatchCoalescer - // where Vec::reserve(n) for small n can allocate more capacity than n - output_buffer: Box::new(BatchCoalescer::new(schema, batch_size.max(8))), + output_buffer: Box::new(BatchCoalescer::new(schema, batch_size)), batch_size, current_right_batch: None, current_right_batch_matched: None, From 64c0e47b0d6bf9b1353e5bed464e8d8b09f91387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Mar 2026 22:37:01 +0100 Subject: [PATCH 14/26] WIP --- datafusion/physical-plan/src/coalesce/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce/mod.rs b/datafusion/physical-plan/src/coalesce/mod.rs index 1c8508ecba1f3..ea1a87d091481 100644 --- a/datafusion/physical-plan/src/coalesce/mod.rs +++ b/datafusion/physical-plan/src/coalesce/mod.rs @@ -59,9 +59,6 @@ impl LimitedBatchCoalescer { target_batch_size: usize, fetch: Option, ) -> Self { - // Use at least 8 to avoid debug_assert in arrow-select's BatchCoalescer - // where Vec::reserve(n) for small n can allocate more capacity than n - let target_batch_size = target_batch_size.max(8); Self { inner: BatchCoalescer::new(schema, target_batch_size) .with_biggest_coalesce_batch_size(Some(target_batch_size / 2)), From b5380c9f60cb994435f10afb422827810c9fcfa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 06:53:36 +0100 Subject: [PATCH 15/26] WIP --- .../physical-plan/src/joins/array_map.rs | 80 ++++++++----- .../physical-plan/src/joins/hash_join/exec.rs | 69 ++++-------- .../src/joins/hash_join/inlist_builder.rs | 105 ++++++++++-------- 3 files changed, 134 insertions(+), 120 deletions(-) diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ad40d6776df4f..ff4a67fac1bf7 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -157,12 +157,17 @@ impl ArrayMap { max_val.wrapping_sub(min_val) } - /// Creates a new [`ArrayMap`] from the given array of join keys. + /// Creates a new [`ArrayMap`] from per-batch arrays of join keys. /// - /// Note: This function processes only the non-null values in the input `array`, + /// Note: This function processes only the non-null values in the input arrays, /// ignoring any rows where the key is `NULL`. /// - pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result { + pub(crate) fn try_new( + arrays: &[&ArrayRef], + total_num_rows: usize, + min_val: u64, + max_val: u64, + ) -> Result { let range = max_val.wrapping_sub(min_val); if range >= usize::MAX as u64 { return internal_err!("ArrayMap key range is too large to be allocated."); @@ -173,10 +178,16 @@ impl ArrayMap { let mut next: Vec = vec![]; let mut num_of_distinct_key = 0; + let data_type = arrays + .first() + .map(|a| a.data_type().clone()) + .unwrap_or(DataType::Int32); + downcast_supported_integer!( - array.data_type() => ( - fill_data, - array, + &data_type => ( + fill_data_batched, + arrays, + total_num_rows, min_val, &mut data, &mut next, @@ -192,8 +203,9 @@ impl ArrayMap { }) } - fn fill_data( - array: &ArrayRef, + fn fill_data_batched( + arrays: &[&ArrayRef], + total_num_rows: usize, offset_val: u64, data: &mut [u32], next: &mut Vec, @@ -202,25 +214,32 @@ impl ArrayMap { where T::Native: AsPrimitive, { - let arr = array.as_primitive::(); // Iterate in reverse to maintain FIFO order when there are duplicate keys. - for (i, val) in arr.iter().enumerate().rev() { - if let Some(val) = val { - let key: u64 = val.as_(); - let idx = key.wrapping_sub(offset_val) as usize; - if idx >= data.len() { - return internal_err!("failed build Array idx >= data.len()"); - } - - if data[idx] != 0 { - if next.is_empty() { - *next = vec![0; array.len()] + // We iterate batches in reverse, and within each batch iterate rows in reverse, + // using a flat index that spans all batches. + let mut flat_offset = total_num_rows; + for array in arrays.iter().rev() { + let arr = array.as_primitive::(); + flat_offset -= arr.len(); + for (row_idx, val) in arr.iter().enumerate().rev() { + if let Some(val) = val { + let key: u64 = val.as_(); + let idx = key.wrapping_sub(offset_val) as usize; + if idx >= data.len() { + return internal_err!("failed build Array idx >= data.len()"); } - next[i] = data[idx] - } else { - *num_of_distinct_key += 1; + let flat_idx = flat_offset + row_idx; + + if data[idx] != 0 { + if next.is_empty() { + *next = vec![0; total_num_rows] + } + next[flat_idx] = data[idx] + } else { + *num_of_distinct_key += 1; + } + data[idx] = flat_idx as u32 + 1; } - data[idx] = (i) as u32 + 1; } } Ok(()) @@ -419,7 +438,7 @@ mod tests { #[test] fn test_array_map_limit_offset_duplicate_elements() -> Result<()> { let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 2])); - let map = ArrayMap::try_new(&build, 1, 2)?; + let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?; let probe = [Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef]; let mut prob_idx = Vec::new(); @@ -450,7 +469,7 @@ mod tests { #[test] fn test_array_map_with_limit_and_misses() -> Result<()> { let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); - let map = ArrayMap::try_new(&build, 1, 2)?; + let map = ArrayMap::try_new(&[&build], build.len(), 1, 2)?; let probe = [Arc::new(Int32Array::from(vec![10, 1, 2])) as ArrayRef]; let (mut p_idx, mut b_idx) = (vec![], vec![]); @@ -483,7 +502,7 @@ mod tests { #[test] fn test_array_map_with_build_duplicates_and_misses() -> Result<()> { let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); - let array_map = ArrayMap::try_new(&build_array, 1, 1)?; + let array_map = ArrayMap::try_new(&[&build_array], build_array.len(), 1, 1)?; // prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2) let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1])); let prob_side_keys = [probe_array]; @@ -513,7 +532,12 @@ mod tests { let min_val = -5_i128; let max_val = 10_i128; - let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?; + let array_map = ArrayMap::try_new( + &[&build_array], + build_array.len(), + min_val as u64, + max_val as u64, + )?; // Probe array let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1])); diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 33089a6f96349..e7c90af69aa9a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -65,7 +65,7 @@ use crate::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; -use arrow::array::{Array, ArrayRef, BooleanBufferBuilder, UInt64Array, new_null_array}; +use arrow::array::{ArrayRef, BooleanBufferBuilder, UInt64Array}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; @@ -112,7 +112,7 @@ fn try_create_array_map( perfect_hash_join_small_build_threshold: usize, perfect_hash_join_min_key_density: f64, null_equality: NullEquality, -) -> Result)>> { +) -> Result> { if on_left.len() != 1 { return Ok(None); } @@ -177,32 +177,19 @@ fn try_create_array_map( let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); reservation.try_grow(mem_size)?; - // Evaluate key expressions per-batch and concatenate for ArrayMap construction - let per_batch_keys: Vec> = batches + // Evaluate key expressions per-batch (no concatenation needed) + let per_batch_keys: Vec = batches .iter() - .map(|batch| evaluate_expressions_to_arrays(on_left, batch)) + .map(|batch| { + let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + Ok(arrays.into_iter().next().unwrap()) + }) .collect::>>()?; - let left_values: Vec = if per_batch_keys.is_empty() || on_left.is_empty() { - on_left - .iter() - .map(|_| new_null_array(&DataType::Null, 0)) - .collect() - } else { - let num_keys = on_left.len(); - (0..num_keys) - .map(|key_idx| { - let arrays: Vec<&dyn Array> = per_batch_keys - .iter() - .map(|keys| keys[key_idx].as_ref()) - .collect(); - Ok(arrow::compute::concat(&arrays)?) - }) - .collect::>>()? - }; + let key_refs: Vec<&ArrayRef> = per_batch_keys.iter().collect(); - let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; + let array_map = ArrayMap::try_new(&key_refs, num_row, min_val, max_val)?; - Ok(Some((array_map, left_values))) + Ok(Some(array_map)) } /// Convert flat indices (used in the visited bitmap) to packed composite indices @@ -2026,7 +2013,7 @@ async fn collect_left_input( _ => None, }; - let (join_hash_map, should_reverse_batches) = if let Some((array_map, _left_value)) = + let (join_hash_map, should_reverse_batches) = if let Some(array_map) = try_create_array_map( &bounds, &schema, @@ -2152,30 +2139,14 @@ async fn collect_left_input( let membership = if num_rows == 0 { PushdownStrategy::Empty } else { - // For membership testing, we need concatenated key columns. - // Concat from the per-batch values. - let concat_values: Vec = - if values_per_batch.is_empty() || values_per_batch[0].is_empty() { - vec![] - } else { - let num_keys = values_per_batch[0].len(); - (0..num_keys) - .map(|key_idx| { - let arrays: Vec<&dyn Array> = values_per_batch - .iter() - .map(|keys| keys[key_idx].as_ref()) - .collect(); - Ok(arrow::compute::concat(&arrays)?) - }) - .collect::>>()? - }; - - let estimated_size = concat_values + // Estimate total size from per-batch values (avoid concatenation for size check) + let estimated_size: usize = values_per_batch .iter() + .flat_map(|keys| keys.iter()) .map(|arr| arr.get_array_memory_size()) - .sum::(); - if concat_values.is_empty() - || concat_values[0].is_empty() + .sum(); + if values_per_batch.is_empty() + || values_per_batch[0].is_empty() || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size || map.num_of_distinct_key() > config @@ -2183,7 +2154,9 @@ async fn collect_left_input( .hash_join_inlist_pushdown_max_distinct_values { PushdownStrategy::Map(Arc::clone(&map)) - } else if let Some(in_list_values) = build_struct_inlist_values(&concat_values)? { + } else if let Some(in_list_values) = + build_struct_inlist_values(&values_per_batch)? + { PushdownStrategy::InList(in_list_values) } else { PushdownStrategy::Map(Arc::clone(&map)) diff --git a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs index 0ca338265ecc6..cd4c94f1b5c2c 100644 --- a/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs +++ b/datafusion/physical-plan/src/joins/hash_join/inlist_builder.rs @@ -32,49 +32,54 @@ pub(super) fn build_struct_fields(data_types: &[DataType]) -> Result { .collect() } -/// Builds InList values from join key column arrays. +/// Builds InList values from per-batch join key column arrays. /// -/// If `join_key_arrays` is: -/// 1. A single array, let's say Int32, this will produce a flat -/// InList expression where the lookup is expected to be scalar Int32 values, -/// that is: this will produce `IN LIST (1, 2, 3)` expected to be used as `2 IN LIST (1, 2, 3)`. -/// 2. An Int32 array and a Utf8 array, this will produce a Struct InList expression -/// where the lookup is expected to be Struct values with two fields (Int32, Utf8), -/// that is: this will produce `IN LIST ((1, "a"), (2, "b"))` expected to be used as `(2, "b") IN LIST ((1, "a"), (2, "b"))`. -/// The field names of the struct are auto-generated as "c0", "c1", ... and should match the struct expression used in the join keys. +/// `values_per_batch` is indexed as `values_per_batch[batch_idx][key_idx]`. +/// +/// If there is a single key column, this concatenates the per-batch arrays into a single flat array. +/// If there are multiple key columns, this builds per-batch StructArrays and concatenates them. /// /// Note that this function does not deduplicate values - deduplication will happen later /// when building an InList expression from this array via `InListExpr::try_new_from_array`. -/// -/// Returns `None` if the estimated size exceeds `max_size_bytes` or if the number of rows -/// exceeds `max_distinct_values`. pub(super) fn build_struct_inlist_values( - join_key_arrays: &[ArrayRef], + values_per_batch: &[Vec], ) -> Result> { - // Build the source array/struct - let source_array: ArrayRef = if join_key_arrays.len() == 1 { - // Single column: use directly - Arc::clone(&join_key_arrays[0]) + if values_per_batch.is_empty() || values_per_batch[0].is_empty() { + return Ok(None); + } + + let num_keys = values_per_batch[0].len(); + + if num_keys == 1 { + // Single column: concat per-batch arrays directly + let arrays: Vec<&dyn arrow::array::Array> = values_per_batch + .iter() + .map(|keys| keys[0].as_ref()) + .collect(); + let concatenated = arrow::compute::concat(&arrays)?; + Ok(Some(concatenated)) } else { - // Multi-column: build StructArray once from all columns - let fields = build_struct_fields( - &join_key_arrays - .iter() - .map(|arr| arr.data_type().clone()) - .collect::>(), - )?; - - // Build field references with proper Arc wrapping - let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = fields + // Multi-column: build per-batch StructArrays (zero-copy wrap), then concat + let data_types: Vec = values_per_batch[0] .iter() - .cloned() - .zip(join_key_arrays.iter().cloned()) + .map(|arr| arr.data_type().clone()) .collect(); + let fields = build_struct_fields(&data_types)?; - Arc::new(StructArray::from(arrays_with_fields)) - }; + let struct_arrays: Vec = values_per_batch + .iter() + .map(|keys| { + let arrays_with_fields: Vec<(FieldRef, ArrayRef)> = + fields.iter().cloned().zip(keys.iter().cloned()).collect(); + Arc::new(StructArray::from(arrays_with_fields)) as ArrayRef + }) + .collect(); - Ok(Some(source_array)) + let refs: Vec<&dyn arrow::array::Array> = + struct_arrays.iter().map(|a| a.as_ref()).collect(); + let concatenated = arrow::compute::concat(&refs)?; + Ok(Some(concatenated)) + } } #[cfg(test)] @@ -89,22 +94,34 @@ mod tests { #[test] fn test_build_single_column_inlist_array() { let array = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; - let result = build_struct_inlist_values(std::slice::from_ref(&array)) - .unwrap() - .unwrap(); + // Single batch with single key column + let batches = vec![vec![Arc::clone(&array)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert!(array.eq(&result)); } + #[test] + fn test_build_single_column_multi_batch() { + let array1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let array2 = Arc::new(Int32Array::from(vec![2, 1])) as ArrayRef; + // Two batches with single key column + let batches = vec![vec![Arc::clone(&array1)], vec![Arc::clone(&array2)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); + + let expected = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; + assert!(expected.eq(&result)); + } + #[test] fn test_build_multi_column_inlist() { let array1 = Arc::new(Int32Array::from(vec![1, 2, 3, 2, 1])) as ArrayRef; let array2 = Arc::new(StringArray::from(vec!["a", "b", "c", "b", "a"])) as ArrayRef; - let result = build_struct_inlist_values(&[array1, array2]) - .unwrap() - .unwrap(); + // Single batch with two key columns + let batches = vec![vec![array1, array2]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!( *result.data_type(), @@ -124,9 +141,9 @@ mod tests { let int_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; - let result = build_struct_inlist_values(&[dict_array, int_array]) - .unwrap() - .unwrap(); + // Single batch with two key columns + let batches = vec![vec![dict_array, int_array]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!(result.len(), 3); assert_eq!( @@ -150,9 +167,9 @@ mod tests { let values = Arc::new(StringArray::from(vec!["foo"])); let dict_array = Arc::new(DictionaryArray::new(keys, values)) as ArrayRef; - let result = build_struct_inlist_values(std::slice::from_ref(&dict_array)) - .unwrap() - .unwrap(); + // Single batch with single key column + let batches = vec![vec![Arc::clone(&dict_array)]]; + let result = build_struct_inlist_values(&batches).unwrap().unwrap(); assert_eq!(result.len(), 3); assert_eq!(result.data_type(), dict_array.data_type()); From 46c77716545931a991b0e780de2124ac2e2c9b0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 07:04:08 +0100 Subject: [PATCH 16/26] Clippy --- datafusion/physical-plan/src/joins/hash_join/stream.rs | 3 +-- datafusion/physical-plan/src/joins/utils.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 28a0c4adddf29..798ec25456522 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -736,8 +736,7 @@ impl HashJoinStream { if need_produce_result_in_final(self.join_type) { let batch_offsets = build_side.left_data.batch_offsets(); let mut bitmap = build_side.left_data.visited_indices_bitmap().lock(); - left_indices.iter().flatten().for_each(|x| { - let packed = x as u64; + left_indices.iter().flatten().for_each(|packed| { let batch_idx = (packed >> 32) as usize; let row_idx = (packed & 0xFFFFFFFF) as usize; let flat = batch_offsets[batch_idx] + row_idx; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e5002ccac1cf6..45e37612f252e 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map( (left_indices, right_indices) } +#[expect(clippy::too_many_arguments)] pub(crate) fn apply_join_filter_to_indices( build_batches: &[RecordBatch], probe_batch: &RecordBatch, @@ -1878,7 +1879,7 @@ pub(super) fn equal_rows_arr( let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - for key_idx in 1..num_keys { + for (key_idx, ) in right_arrays.iter().enumerate().take(num_keys).skip(1) { let arr_left = gather_left_key(key_idx)?; let arr_right = take(right_arrays[key_idx].as_ref(), indices_right, None)?; let eq_result = From 56c04d5cb6134f13576cee81da6a7e3f8e6047c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 07:16:39 +0100 Subject: [PATCH 17/26] WIP --- datafusion/physical-plan/src/joins/utils.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 45e37612f252e..58208cf18c808 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1879,8 +1879,7 @@ pub(super) fn equal_rows_arr( let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - for (key_idx, ) in right_arrays.iter().enumerate().take(num_keys).skip(1) { - let arr_left = gather_left_key(key_idx)?; + for (key_idx, arr_left) in right_arrays.iter().enumerate().take(num_keys).skip(1) { let arr_right = take(right_arrays[key_idx].as_ref(), indices_right, None)?; let eq_result = eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; From 114e182e5836d6ac3bd7296e44c2bef8ada450c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sun, 15 Mar 2026 07:32:58 +0100 Subject: [PATCH 18/26] WIP --- datafusion/physical-plan/src/joins/utils.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 58208cf18c808..78cee9e378ad4 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1879,8 +1879,9 @@ pub(super) fn equal_rows_arr( let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - for (key_idx, arr_left) in right_arrays.iter().enumerate().take(num_keys).skip(1) { - let arr_right = take(right_arrays[key_idx].as_ref(), indices_right, None)?; + for (key_idx, right_key_array) in right_arrays.iter().enumerate().take(num_keys).skip(1) { + let arr_left = gather_left_key(key_idx)?; + let arr_right = take(right_key_array.as_ref(), indices_right, None)?; let eq_result = eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; equal = and(&equal, &eq_result)?; From aefebecd87209bdc905fff9ddbdbfac830b41e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 08:46:00 +0100 Subject: [PATCH 19/26] Use take instead of interleave for single-batch build side in hash join When the build side has only one batch, use `take` directly instead of `interleave`, avoiding the overhead of multi-array dispatch and extra (batch_idx, row_idx) allocations. This is the common case for CollectLeft mode and fixes a regression in TPC-DS Q74. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 122 +++++++++++++------- 1 file changed, 79 insertions(+), 43 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 78cee9e378ad4..0d9b86fd88bbd 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -992,6 +992,26 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result Result { + if let Some(idx_nulls) = build_indices.nulls() { + let data = result.to_data(); + let combined_nulls = if let Some(existing) = data.nulls() { + NullBuffer::new(existing.inner() & idx_nulls.inner()) + } else { + idx_nulls.clone() + }; + Ok(arrow::array::make_array( + data.into_builder() + .null_bit_buffer(Some(combined_nulls.into_inner().into_inner())) + .build()?, + )) + } else { + Ok(result) + } +} + /// Decode packed composite indices into (batch_index, row_index) pairs. /// /// Each u64 value encodes `(batch_idx << 32) | row_idx`. @@ -1033,13 +1053,31 @@ pub(crate) fn build_batch_from_indices( return new_empty_schema_batch(schema, row_count); } - // Pre-compute interleave indices for build-side columns (shared across all build columns) - let interleave_indices = if build_batches.is_empty() + // Determine build-side gathering strategy: + // - Single batch: use `take` directly (packed value == row index since batch_idx is 0) + // - Multiple batches: use `interleave` with decoded (batch_idx, row_idx) pairs + // - No batches or all-null indices: emit null arrays + enum BuildGather { + /// Single build batch — use `take` with row indices cast to u32 + SingleBatch(UInt32Array), + /// Multiple build batches — use `interleave` with (batch_idx, row_idx) pairs + MultiBatch(Vec<(usize, usize)>), + /// All build indices are null (outer join with no matches) + AllNull, + } + + let build_gather = if build_batches.is_empty() || build_indices.null_count() == build_indices.len() { - None + BuildGather::AllNull + } else if build_batches.len() == 1 { + // Single batch: packed value == row index (batch_idx is 0), use take directly + // Preserve the null buffer from build_indices for outer joins + let values: Vec = build_indices.values().iter().map(|&v| v as u32).collect(); + let row_indices = UInt32Array::new(values.into(), build_indices.nulls().cloned()); + BuildGather::SingleBatch(row_indices) } else { - Some(packed_indices_to_interleave(build_indices)) + BuildGather::MultiBatch(packed_indices_to_interleave(build_indices)) }; let mut columns: Vec> = Vec::with_capacity(schema.fields().len()); @@ -1048,48 +1086,44 @@ pub(crate) fn build_batch_from_indices( let array = if column_index.side == JoinSide::None { Arc::new(compute::is_not_null(probe_indices)?) } else if column_index.side == build_side { - if let Some(ref il_indices) = interleave_indices { - // Gather column arrays from all build batches - let arrays: Vec<&dyn Array> = build_batches - .iter() - .map(|b| b.column(column_index.index).as_ref()) - .collect(); - let result = compute::interleave(&arrays, il_indices)?; - // Apply null mask from build_indices (for outer joins where - // unmatched rows are represented as null indices) - if build_indices.null_count() > 0 { - if let Some(idx_nulls) = build_indices.nulls() { - let data = result.to_data(); - let combined_nulls = if let Some(existing) = data.nulls() { - NullBuffer::new(existing.inner() & idx_nulls.inner()) - } else { - idx_nulls.clone() - }; - arrow::array::make_array( - data.into_builder() - .null_bit_buffer(Some( - combined_nulls.into_inner().into_inner(), - )) - .build()?, - ) + match &build_gather { + BuildGather::SingleBatch(row_indices) => { + // take() already handles null indices (produces null output), + // so no extra null mask needed + take( + build_batches[0].column(column_index.index).as_ref(), + row_indices, + None, + )? + } + BuildGather::MultiBatch(il_indices) => { + // Gather column arrays from all build batches + let arrays: Vec<&dyn Array> = build_batches + .iter() + .map(|b| b.column(column_index.index).as_ref()) + .collect(); + let result = compute::interleave(&arrays, il_indices)?; + // Apply null mask from build_indices (for outer joins where + // unmatched rows are represented as null indices) + if build_indices.null_count() > 0 { + apply_null_mask(result, build_indices)? } else { result } - } else { - result } - } else { - // All build indices are null (outer join with no matches) - // Use the build batch schema to get the correct data type - // (column_index.index refers to the build batch columns, not the output schema) - let data_type = if let Some(b) = build_batches.first() { - b.column(column_index.index).data_type().clone() - } else { - // No build batches available (empty build side in CollectLeft mode). - // Use the output schema field at the current column position. - schema.field(columns.len()).data_type().clone() - }; - new_null_array(&data_type, build_indices.len()) + BuildGather::AllNull => { + // All build indices are null (outer join with no matches) + // Use the build batch schema to get the correct data type + // (column_index.index refers to the build batch columns, not the output schema) + let data_type = if let Some(b) = build_batches.first() { + b.column(column_index.index).data_type().clone() + } else { + // No build batches available (empty build side in CollectLeft mode). + // Use the output schema field at the current column position. + schema.field(columns.len()).data_type().clone() + }; + new_null_array(&data_type, build_indices.len()) + } } } else { let array = probe_batch.column(column_index.index); @@ -1879,7 +1913,9 @@ pub(super) fn equal_rows_arr( let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; - for (key_idx, right_key_array) in right_arrays.iter().enumerate().take(num_keys).skip(1) { + for (key_idx, right_key_array) in + right_arrays.iter().enumerate().take(num_keys).skip(1) + { let arr_left = gather_left_key(key_idx)?; let arr_right = take(right_key_array.as_ref(), indices_right, None)?; let eq_result = From acde8fee9a9cae1dd5152db04aeb385dc83ca5d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 08:52:48 +0100 Subject: [PATCH 20/26] Simplify apply_null_mask and restore equal_rows_arr empty behavior - Use NullBuffer::union + build_unchecked in apply_null_mask - Remove redundant null_count guard at call site - Accept Option<&NullBuffer> instead of &UInt64Array - Restore equal_rows_arr to return empty arrays instead of error Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 39 ++++++++------------- 1 file changed, 15 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 0d9b86fd88bbd..e5d0558bbde01 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -992,23 +992,20 @@ fn new_empty_schema_batch(schema: &Schema, row_count: usize) -> Result Result { - if let Some(idx_nulls) = build_indices.nulls() { - let data = result.to_data(); - let combined_nulls = if let Some(existing) = data.nulls() { - NullBuffer::new(existing.inner() & idx_nulls.inner()) - } else { - idx_nulls.clone() - }; - Ok(arrow::array::make_array( - data.into_builder() - .null_bit_buffer(Some(combined_nulls.into_inner().into_inner())) - .build()?, - )) - } else { - Ok(result) +fn apply_null_mask(result: ArrayRef, index_nulls: Option<&NullBuffer>) -> ArrayRef { + let combined = NullBuffer::union(result.nulls(), index_nulls); + // SAFETY: We only modify the null buffer, which is the union of the existing nulls + // and the index nulls. All other array data (buffers, offsets, child data) is unchanged. + unsafe { + arrow::array::make_array( + result + .into_data() + .into_builder() + .nulls(combined) + .build_unchecked(), + ) } } @@ -1105,11 +1102,7 @@ pub(crate) fn build_batch_from_indices( let result = compute::interleave(&arrays, il_indices)?; // Apply null mask from build_indices (for outer joins where // unmatched rows are represented as null indices) - if build_indices.null_count() > 0 { - apply_null_mask(result, build_indices)? - } else { - result - } + apply_null_mask(result, build_indices.nulls()) } BuildGather::AllNull => { // All build indices are null (outer join with no matches) @@ -1870,9 +1863,7 @@ pub(super) fn equal_rows_arr( null_equality: NullEquality, ) -> Result<(UInt64Array, UInt32Array)> { if left_arrays_per_batch.is_empty() || right_arrays.is_empty() { - return Err(DataFusionError::Internal( - "At least one array should be provided for both left and right".to_string(), - )); + return Ok((Vec::::new().into(), Vec::::new().into())); } let num_keys = right_arrays.len(); From e2043ad6d901151ff96b269f358134338389fdc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 09:03:52 +0100 Subject: [PATCH 21/26] Optimize equal_rows_arr with element-wise comparison Avoid allocating intermediate arrays (take+eq+and) per key column in hash collision resolution. For common types (primitives, strings), compare values element-wise using a mutable BooleanBufferBuilder. Falls back to take+eq for nested/unsupported types. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 246 ++++++++++++++++---- 1 file changed, 201 insertions(+), 45 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e5d0558bbde01..87216896cd58d 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -41,9 +41,9 @@ pub use crate::joins::{JoinOn, JoinOnRef}; use ahash::RandomState; use arrow::array::{ - Array, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, PrimitiveArray, - RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, UInt64Array, - builder::UInt64Builder, downcast_array, new_null_array, + Array, ArrayAccessor, ArrowPrimitiveType, BooleanBufferBuilder, NativeAdapter, + PrimitiveArray, RecordBatch, RecordBatchOptions, UInt32Array, UInt32Builder, + UInt64Array, builder::UInt64Builder, downcast_array, new_null_array, }; use arrow::array::{ ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array, @@ -54,7 +54,7 @@ use arrow::array::{ }; use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::compute::kernels::cmp::eq; -use arrow::compute::{self, FilterBuilder, and, take}; +use arrow::compute::{self, FilterBuilder, take}; use arrow::datatypes::{ ArrowNativeType, Field, Schema, SchemaBuilder, UInt32Type, UInt64Type, }; @@ -1866,54 +1866,77 @@ pub(super) fn equal_rows_arr( return Ok((Vec::::new().into(), Vec::::new().into())); } + let num_rows = indices_left.len(); let num_keys = right_arrays.len(); + let left_indices = indices_left.values(); + let right_indices = indices_right.values(); - // Gather left-side key values using packed indices - let gather_left_key = |key_idx: usize| -> Result { - if left_arrays_per_batch.len() == 1 { - // Single batch: packed value == row index (batch_idx is 0), - // so we can use take directly with UInt32 indices. - let row_indices: UInt32Array = - indices_left.values().iter().map(|&v| v as u32).collect(); - Ok(take( - left_arrays_per_batch[0][key_idx].as_ref(), - &row_indices, - None, - )?) - } else { - // Multiple batches: decode packed indices and use interleave - let arrays: Vec<&dyn Array> = left_arrays_per_batch - .iter() - .map(|batch_keys| batch_keys[key_idx].as_ref()) - .collect(); - let il_indices: Vec<(usize, usize)> = indices_left - .values() - .iter() - .map(|&packed| { - let batch_idx = (packed >> 32) as usize; - let row_idx = (packed & 0xFFFFFFFF) as usize; - (batch_idx, row_idx) - }) - .collect(); - Ok(compute::interleave(&arrays, &il_indices)?) - } - }; + // Build a mutable bitmap: start all-true, clear bits where keys don't match. + // This avoids allocating intermediate arrays via take+eq+and per key column. + let mut equal_bits = BooleanBufferBuilder::new(num_rows); + equal_bits.append_n(num_rows, true); - let arr_left = gather_left_key(0)?; - let arr_right = take(right_arrays[0].as_ref(), indices_right, None)?; + let single_batch = left_arrays_per_batch.len() == 1; - let mut equal: BooleanArray = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + for key_idx in 0..num_keys { + let right_array = &right_arrays[key_idx]; - for (key_idx, right_key_array) in - right_arrays.iter().enumerate().take(num_keys).skip(1) - { - let arr_left = gather_left_key(key_idx)?; - let arr_right = take(right_key_array.as_ref(), indices_right, None)?; - let eq_result = - eq_dyn_null(arr_left.as_ref(), arr_right.as_ref(), null_equality)?; - equal = and(&equal, &eq_result)?; + // For single-batch builds, try element-wise comparison which avoids + // allocating intermediate arrays via take+eq per key column. + // Falls back to take+eq for nested/unsupported types or multi-batch. + let handled = single_batch + && compare_rows_elementwise( + &mut equal_bits, + left_indices, + right_indices, + &left_arrays_per_batch[0][key_idx], + right_array, + null_equality, + |packed| packed as usize, + ); + + if !handled { + // Fallback: materialize via take/interleave, then eq + let arr_left = if single_batch { + let row_indices: UInt32Array = + left_indices.iter().map(|&v| v as u32).collect(); + take( + left_arrays_per_batch[0][key_idx].as_ref(), + &row_indices, + None, + )? + } else { + let arrays: Vec<&dyn Array> = left_arrays_per_batch + .iter() + .map(|batch_keys| batch_keys[key_idx].as_ref()) + .collect(); + let il_indices: Vec<(usize, usize)> = left_indices + .iter() + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + (batch_idx, row_idx) + }) + .collect(); + compute::interleave(&arrays, &il_indices)? + }; + let arr_right = take(right_array.as_ref(), indices_right, None)?; + let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?; + // AND the result into our mutable bitmap. + // Null positions in eq_result are treated as "not equal". + let eq_values = eq_result.values(); + let eq_nulls = eq_result.nulls(); + for i in 0..num_rows { + if equal_bits.get_bit(i) + && (!eq_values.value(i) || eq_nulls.is_some_and(|n| !n.is_valid(i))) + { + equal_bits.set_bit(i, false); + } + } + } } + let equal = BooleanArray::new(equal_bits.finish(), None); let filter_builder = FilterBuilder::new(&equal).optimize().build(); let left_filtered = filter_builder.filter(indices_left)?; @@ -1925,6 +1948,139 @@ pub(super) fn equal_rows_arr( )) } +/// Compare rows element-wise without materializing intermediate arrays. +/// Returns `true` if the comparison was handled, `false` if fallback is needed. +/// +/// Clears bits in `equal_bits` where the left and right values at the +/// indexed positions are not equal (respecting `null_equality`). +fn compare_rows_elementwise( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_array: &ArrayRef, + right_array: &ArrayRef, + null_equality: NullEquality, + left_index_to_row: impl Fn(u64) -> usize, +) -> bool { + // Nested types need special comparison logic, fall back + if left_array.data_type().is_nested() { + return false; + } + + macro_rules! compare_elementwise { + ($array_type:ty) => {{ + let left = left_array.as_any().downcast_ref::<$array_type>().unwrap(); + let right = right_array.as_any().downcast_ref::<$array_type>().unwrap(); + do_compare_elementwise( + equal_bits, + left_indices, + right_indices, + &left, + &right, + null_equality, + &left_index_to_row, + ); + }}; + } + + match left_array.data_type() { + DataType::Null => { + match null_equality { + NullEquality::NullEqualsNothing => { + // null != null, clear all bits + for i in 0..left_indices.len() { + equal_bits.set_bit(i, false); + } + } + NullEquality::NullEqualsNull => {} // null == null, keep bits + } + } + DataType::Boolean => compare_elementwise!(BooleanArray), + DataType::Int8 => compare_elementwise!(Int8Array), + DataType::Int16 => compare_elementwise!(Int16Array), + DataType::Int32 => compare_elementwise!(Int32Array), + DataType::Int64 => compare_elementwise!(Int64Array), + DataType::UInt8 => compare_elementwise!(UInt8Array), + DataType::UInt16 => compare_elementwise!(UInt16Array), + DataType::UInt32 => compare_elementwise!(UInt32Array), + DataType::UInt64 => compare_elementwise!(UInt64Array), + DataType::Float32 => compare_elementwise!(Float32Array), + DataType::Float64 => compare_elementwise!(Float64Array), + DataType::Binary => compare_elementwise!(BinaryArray), + DataType::BinaryView => compare_elementwise!(BinaryViewArray), + DataType::FixedSizeBinary(_) => compare_elementwise!(FixedSizeBinaryArray), + DataType::LargeBinary => compare_elementwise!(LargeBinaryArray), + DataType::Utf8 => compare_elementwise!(StringArray), + DataType::Utf8View => compare_elementwise!(StringViewArray), + DataType::LargeUtf8 => compare_elementwise!(LargeStringArray), + DataType::Decimal128(..) => compare_elementwise!(Decimal128Array), + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => compare_elementwise!(TimestampSecondArray), + TimeUnit::Millisecond => compare_elementwise!(TimestampMillisecondArray), + TimeUnit::Microsecond => compare_elementwise!(TimestampMicrosecondArray), + TimeUnit::Nanosecond => compare_elementwise!(TimestampNanosecondArray), + }, + DataType::Date32 => compare_elementwise!(Date32Array), + DataType::Date64 => compare_elementwise!(Date64Array), + _ => return false, // Unsupported type, use fallback + } + true +} + +/// Inner loop for element-wise comparison. Generic over array type via `ArrayAccessor`. +/// Compares `left.value(left_index_to_row(left_indices[i]))` against +/// `right.value(right_indices[i])` for each row, clearing bits that don't match. +fn do_compare_elementwise( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left: &A, + right: &A, + null_equality: NullEquality, + left_index_to_row: &dyn Fn(u64) -> usize, +) where + A::Item: PartialEq, +{ + let left_nulls = left.nulls(); + let right_nulls = right.nulls(); + let has_nulls = left.null_count() > 0 || right.null_count() > 0; + + let num_rows = left_indices.len(); + + if !has_nulls { + // Fast path: no nulls, just compare values directly + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let l_idx = left_index_to_row(left_indices[i]); + let r_idx = right_indices[i] as usize; + if left.value(l_idx) != right.value(r_idx) { + equal_bits.set_bit(i, false); + } + } + } else { + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let l_idx = left_index_to_row(left_indices[i]); + let r_idx = right_indices[i] as usize; + let l_null = left_nulls.is_some_and(|nulls| !nulls.is_valid(l_idx)); + let r_null = right_nulls.is_some_and(|nulls| !nulls.is_valid(r_idx)); + + let is_equal = match (l_null, r_null) { + (true, true) => null_equality == NullEquality::NullEqualsNull, + (true, false) | (false, true) => false, + (false, false) => left.value(l_idx) == right.value(r_idx), + }; + if !is_equal { + equal_bits.set_bit(i, false); + } + } + } +} + // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, From 97b02034934a59cbb2595ece7e45c440707f9dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 09:17:01 +0100 Subject: [PATCH 22/26] Simplify equal_rows_arr: remove dyn Fn, hoist indices, use bitwise AND - Remove left_index_to_row closure parameter (only single-batch uses element-wise path, where packed == row index) - Hoist fallback row_indices and il_indices outside the key loop (lazy-allocated, computed once, reused across keys) - Replace scalar bitmap AND loop with word-level byte-slice AND via and_bitmap_with_boolean_buffer helper Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 78 +++++++++++++-------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 87216896cd58d..3a342a562ed9b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1137,7 +1137,7 @@ pub(crate) fn build_batch_from_indices( /// The resulting batch has [Schema] `schema`. pub(crate) fn build_batch_empty_build_side( schema: &Schema, - build_batch: &RecordBatch, + left_schema: &Schema, probe_batch: &RecordBatch, column_indices: &[ColumnIndex], join_type: JoinType, @@ -1165,7 +1165,7 @@ pub(crate) fn build_batch_empty_build_side( let array = match column_index.side { // left -> null array JoinSide::Left => new_null_array( - build_batch.column(column_index.index).data_type(), + left_schema.field(column_index.index).data_type(), num_rows, ), // right -> respective right array @@ -1878,6 +1878,11 @@ pub(super) fn equal_rows_arr( let single_batch = left_arrays_per_batch.len() == 1; + // Pre-compute fallback index arrays once (used across all keys that need fallback). + // Lazy: only allocated if a key actually needs the fallback path. + let mut fallback_row_indices: Option = None; + let mut fallback_il_indices: Option> = None; + for key_idx in 0..num_keys { let right_array = &right_arrays[key_idx]; @@ -1892,46 +1897,44 @@ pub(super) fn equal_rows_arr( &left_arrays_per_batch[0][key_idx], right_array, null_equality, - |packed| packed as usize, ); if !handled { // Fallback: materialize via take/interleave, then eq let arr_left = if single_batch { - let row_indices: UInt32Array = - left_indices.iter().map(|&v| v as u32).collect(); + let row_indices = fallback_row_indices.get_or_insert_with(|| { + left_indices.iter().map(|&v| v as u32).collect() + }); take( left_arrays_per_batch[0][key_idx].as_ref(), - &row_indices, + row_indices, None, )? } else { + let il_indices = fallback_il_indices.get_or_insert_with(|| { + left_indices + .iter() + .map(|&packed| { + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + (batch_idx, row_idx) + }) + .collect() + }); let arrays: Vec<&dyn Array> = left_arrays_per_batch .iter() .map(|batch_keys| batch_keys[key_idx].as_ref()) .collect(); - let il_indices: Vec<(usize, usize)> = left_indices - .iter() - .map(|&packed| { - let batch_idx = (packed >> 32) as usize; - let row_idx = (packed & 0xFFFFFFFF) as usize; - (batch_idx, row_idx) - }) - .collect(); - compute::interleave(&arrays, &il_indices)? + compute::interleave(&arrays, il_indices)? }; let arr_right = take(right_array.as_ref(), indices_right, None)?; let eq_result = eq_dyn_null(&arr_left, &arr_right, null_equality)?; // AND the result into our mutable bitmap. - // Null positions in eq_result are treated as "not equal". - let eq_values = eq_result.values(); - let eq_nulls = eq_result.nulls(); - for i in 0..num_rows { - if equal_bits.get_bit(i) - && (!eq_values.value(i) || eq_nulls.is_some_and(|n| !n.is_valid(i))) - { - equal_bits.set_bit(i, false); - } + // Null positions in eq_result are treated as "not equal": + // first AND with the values, then AND with the validity bitmap. + and_bitmap_with_boolean_buffer(&mut equal_bits, eq_result.values()); + if let Some(eq_nulls) = eq_result.nulls() { + and_bitmap_with_boolean_buffer(&mut equal_bits, eq_nulls.inner()); } } } @@ -1948,9 +1951,23 @@ pub(super) fn equal_rows_arr( )) } +/// AND a `BooleanBuffer` into a `BooleanBufferBuilder` in-place using +/// word-level operations on the underlying byte slices. +fn and_bitmap_with_boolean_buffer( + builder: &mut BooleanBufferBuilder, + rhs: &BooleanBuffer, +) { + let lhs = builder.as_slice_mut(); + let rhs = rhs.inner().as_slice(); + for (l, r) in lhs.iter_mut().zip(rhs.iter()) { + *l &= r; + } +} + /// Compare rows element-wise without materializing intermediate arrays. /// Returns `true` if the comparison was handled, `false` if fallback is needed. /// +/// Only works for single-batch builds where packed index == row index. /// Clears bits in `equal_bits` where the left and right values at the /// indexed positions are not equal (respecting `null_equality`). fn compare_rows_elementwise( @@ -1960,7 +1977,6 @@ fn compare_rows_elementwise( left_array: &ArrayRef, right_array: &ArrayRef, null_equality: NullEquality, - left_index_to_row: impl Fn(u64) -> usize, ) -> bool { // Nested types need special comparison logic, fall back if left_array.data_type().is_nested() { @@ -1978,7 +1994,6 @@ fn compare_rows_elementwise( &left, &right, null_equality, - &left_index_to_row, ); }}; } @@ -2028,8 +2043,10 @@ fn compare_rows_elementwise( } /// Inner loop for element-wise comparison. Generic over array type via `ArrayAccessor`. -/// Compares `left.value(left_index_to_row(left_indices[i]))` against +/// Compares `left.value(left_indices[i] as usize)` against /// `right.value(right_indices[i])` for each row, clearing bits that don't match. +/// +/// Only valid for single-batch builds where packed index == row index. fn do_compare_elementwise( equal_bits: &mut BooleanBufferBuilder, left_indices: &[u64], @@ -2037,7 +2054,6 @@ fn do_compare_elementwise( left: &A, right: &A, null_equality: NullEquality, - left_index_to_row: &dyn Fn(u64) -> usize, ) where A::Item: PartialEq, { @@ -2053,7 +2069,7 @@ fn do_compare_elementwise( if !equal_bits.get_bit(i) { continue; } - let l_idx = left_index_to_row(left_indices[i]); + let l_idx = left_indices[i] as usize; let r_idx = right_indices[i] as usize; if left.value(l_idx) != right.value(r_idx) { equal_bits.set_bit(i, false); @@ -2064,7 +2080,7 @@ fn do_compare_elementwise( if !equal_bits.get_bit(i) { continue; } - let l_idx = left_index_to_row(left_indices[i]); + let l_idx = left_indices[i] as usize; let r_idx = right_indices[i] as usize; let l_null = left_nulls.is_some_and(|nulls| !nulls.is_valid(l_idx)); let r_null = right_nulls.is_some_and(|nulls| !nulls.is_valid(r_idx)); @@ -3205,7 +3221,7 @@ mod tests { let result = build_batch_empty_build_side( &empty_schema, - &build_batch, + &build_batch.schema(), &probe_batch, &[], // no column indices with empty projection JoinType::Right, From 9986fb0dee31f10d8813fb2a210d685058b32321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 09:41:00 +0100 Subject: [PATCH 23/26] Fix --- datafusion/physical-plan/src/joins/hash_join/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 798ec25456522..f3eed84ce8e7b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -653,7 +653,7 @@ impl HashJoinStream { if is_empty && self.filter.is_none() { let result = build_batch_empty_build_side( &self.schema, - &build_side.left_data.batch(), + &build_side.left_data.batch().schema(), &state.batch, &self.column_indices, self.join_type, From 42aa7b22e97bba59e68798de47fdc521875abe2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 10:23:28 +0100 Subject: [PATCH 24/26] Extend element-wise comparison to multi-batch builds in equal_rows_arr The multi-batch path in equal_rows_arr was using interleave() to materialize intermediate arrays, which is slower than the old take()-on-concatenated-keys approach. This adds compare_rows_elementwise_multi which decodes packed indices (batch_idx << 32 | row_idx) and compares values directly from per-batch arrays, avoiding all intermediate allocation for common key types. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 161 +++++++++++++++++++- 1 file changed, 155 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 3a342a562ed9b..2df27043d0713 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1886,18 +1886,29 @@ pub(super) fn equal_rows_arr( for key_idx in 0..num_keys { let right_array = &right_arrays[key_idx]; - // For single-batch builds, try element-wise comparison which avoids - // allocating intermediate arrays via take+eq per key column. - // Falls back to take+eq for nested/unsupported types or multi-batch. - let handled = single_batch - && compare_rows_elementwise( + // Try element-wise comparison which avoids allocating intermediate arrays. + // Works for both single-batch and multi-batch builds on common types. + // Falls back to take/interleave+eq for nested/unsupported types. + let handled = if single_batch { + compare_rows_elementwise( &mut equal_bits, left_indices, right_indices, &left_arrays_per_batch[0][key_idx], right_array, null_equality, - ); + ) + } else { + compare_rows_elementwise_multi( + &mut equal_bits, + left_indices, + right_indices, + left_arrays_per_batch, + key_idx, + right_array, + null_equality, + ) + }; if !handled { // Fallback: materialize via take/interleave, then eq @@ -2097,6 +2108,144 @@ fn do_compare_elementwise( } } +/// Compare rows element-wise for multi-batch builds without materializing +/// intermediate arrays. Decodes packed indices to (batch_idx, row_idx) and +/// accesses the correct batch array directly. +/// Returns `true` if the comparison was handled, `false` if fallback is needed. +fn compare_rows_elementwise_multi( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_arrays_per_batch: &[Vec], + key_idx: usize, + right_array: &ArrayRef, + null_equality: NullEquality, +) -> bool { + if right_array.data_type().is_nested() { + return false; + } + + macro_rules! compare_multi { + ($array_type:ty) => {{ + let left_typed: Vec<&$array_type> = left_arrays_per_batch + .iter() + .map(|keys| keys[key_idx].as_any().downcast_ref::<$array_type>().unwrap()) + .collect(); + let right = right_array.as_any().downcast_ref::<$array_type>().unwrap(); + do_compare_elementwise_multi( + equal_bits, + left_indices, + right_indices, + &left_typed, + &right, + null_equality, + ); + }}; + } + + match right_array.data_type() { + DataType::Null => { + match null_equality { + NullEquality::NullEqualsNothing => { + for i in 0..left_indices.len() { + equal_bits.set_bit(i, false); + } + } + NullEquality::NullEqualsNull => {} + } + } + DataType::Boolean => compare_multi!(BooleanArray), + DataType::Int8 => compare_multi!(Int8Array), + DataType::Int16 => compare_multi!(Int16Array), + DataType::Int32 => compare_multi!(Int32Array), + DataType::Int64 => compare_multi!(Int64Array), + DataType::UInt8 => compare_multi!(UInt8Array), + DataType::UInt16 => compare_multi!(UInt16Array), + DataType::UInt32 => compare_multi!(UInt32Array), + DataType::UInt64 => compare_multi!(UInt64Array), + DataType::Float32 => compare_multi!(Float32Array), + DataType::Float64 => compare_multi!(Float64Array), + DataType::Binary => compare_multi!(BinaryArray), + DataType::BinaryView => compare_multi!(BinaryViewArray), + DataType::FixedSizeBinary(_) => compare_multi!(FixedSizeBinaryArray), + DataType::LargeBinary => compare_multi!(LargeBinaryArray), + DataType::Utf8 => compare_multi!(StringArray), + DataType::Utf8View => compare_multi!(StringViewArray), + DataType::LargeUtf8 => compare_multi!(LargeStringArray), + DataType::Decimal128(..) => compare_multi!(Decimal128Array), + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => compare_multi!(TimestampSecondArray), + TimeUnit::Millisecond => compare_multi!(TimestampMillisecondArray), + TimeUnit::Microsecond => compare_multi!(TimestampMicrosecondArray), + TimeUnit::Nanosecond => compare_multi!(TimestampNanosecondArray), + }, + DataType::Date32 => compare_multi!(Date32Array), + DataType::Date64 => compare_multi!(Date64Array), + _ => return false, + } + true +} + +/// Inner loop for multi-batch element-wise comparison. +/// Decodes packed indices to access the correct batch array per row. +/// +/// Takes `&[A]` where `A` is a reference type like `&Int32Array` that implements +/// `ArrayAccessor`. Null checking works through auto-deref to the `Array` trait. +fn do_compare_elementwise_multi( + equal_bits: &mut BooleanBufferBuilder, + left_indices: &[u64], + right_indices: &[u32], + left_arrays: &[A], + right: &A, + null_equality: NullEquality, +) where + A::Item: PartialEq, +{ + let right_nulls = right.nulls(); + let has_nulls = right.null_count() > 0 + || left_arrays + .iter() + .any(|a| a.null_count() > 0); + let num_rows = left_indices.len(); + + if !has_nulls { + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let packed = left_indices[i]; + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let r_idx = right_indices[i] as usize; + if left_arrays[batch_idx].value(row_idx) != right.value(r_idx) { + equal_bits.set_bit(i, false); + } + } + } else { + for i in 0..num_rows { + if !equal_bits.get_bit(i) { + continue; + } + let packed = left_indices[i]; + let batch_idx = (packed >> 32) as usize; + let row_idx = (packed & 0xFFFFFFFF) as usize; + let r_idx = right_indices[i] as usize; + let left = &left_arrays[batch_idx]; + let l_null = left.nulls().is_some_and(|n| !n.is_valid(row_idx)); + let r_null = right_nulls.is_some_and(|n| !n.is_valid(r_idx)); + + let is_equal = match (l_null, r_null) { + (true, true) => null_equality == NullEquality::NullEqualsNull, + (true, false) | (false, true) => false, + (false, false) => left.value(row_idx) == right.value(r_idx), + }; + if !is_equal { + equal_bits.set_bit(i, false); + } + } + } +} + // version of eq_dyn supporting equality on null arrays fn eq_dyn_null( left: &dyn Array, From 7f5db8aef575ff0c8571e2da9f9ee975f1c6c9ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 10:27:30 +0100 Subject: [PATCH 25/26] Simplify: extract shared dispatch_elementwise macro, clean up multi-batch API - Extract duplicated DataType match block into dispatch_elementwise! macro used by both single-batch and multi-batch comparison functions - Change compare_rows_elementwise_multi to take &[&ArrayRef] instead of leaking (left_arrays_per_batch, key_idx) internal structure - Pre-compute left null buffers per batch outside the hot loop in do_compare_elementwise_multi Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/joins/utils.rs | 158 +++++++++----------- 1 file changed, 67 insertions(+), 91 deletions(-) diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 2df27043d0713..eeae0efa5fe1c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1899,12 +1899,15 @@ pub(super) fn equal_rows_arr( null_equality, ) } else { + let left_arrays_for_key: Vec<&ArrayRef> = left_arrays_per_batch + .iter() + .map(|batch_keys| &batch_keys[key_idx]) + .collect(); compare_rows_elementwise_multi( &mut equal_bits, left_indices, right_indices, - left_arrays_per_batch, - key_idx, + &left_arrays_for_key, right_array, null_equality, ) @@ -1975,6 +1978,54 @@ fn and_bitmap_with_boolean_buffer( } } +/// Dispatch a macro call by Arrow DataType for element-wise comparison. +/// The `$action` macro is invoked with the concrete array type for each supported type. +/// Returns `false` for unsupported/nested types (caller should use fallback). +macro_rules! dispatch_elementwise { + ($data_type:expr, $equal_bits:expr, $left_indices:expr, $null_equality:expr, $action:ident) => { + match $data_type { + DataType::Null => { + match $null_equality { + NullEquality::NullEqualsNothing => { + for i in 0..$left_indices.len() { + $equal_bits.set_bit(i, false); + } + } + NullEquality::NullEqualsNull => {} + } + } + DataType::Boolean => $action!(BooleanArray), + DataType::Int8 => $action!(Int8Array), + DataType::Int16 => $action!(Int16Array), + DataType::Int32 => $action!(Int32Array), + DataType::Int64 => $action!(Int64Array), + DataType::UInt8 => $action!(UInt8Array), + DataType::UInt16 => $action!(UInt16Array), + DataType::UInt32 => $action!(UInt32Array), + DataType::UInt64 => $action!(UInt64Array), + DataType::Float32 => $action!(Float32Array), + DataType::Float64 => $action!(Float64Array), + DataType::Binary => $action!(BinaryArray), + DataType::BinaryView => $action!(BinaryViewArray), + DataType::FixedSizeBinary(_) => $action!(FixedSizeBinaryArray), + DataType::LargeBinary => $action!(LargeBinaryArray), + DataType::Utf8 => $action!(StringArray), + DataType::Utf8View => $action!(StringViewArray), + DataType::LargeUtf8 => $action!(LargeStringArray), + DataType::Decimal128(..) => $action!(Decimal128Array), + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => $action!(TimestampSecondArray), + TimeUnit::Millisecond => $action!(TimestampMillisecondArray), + TimeUnit::Microsecond => $action!(TimestampMicrosecondArray), + TimeUnit::Nanosecond => $action!(TimestampNanosecondArray), + }, + DataType::Date32 => $action!(Date32Array), + DataType::Date64 => $action!(Date64Array), + _ => return false, + } + }; +} + /// Compare rows element-wise without materializing intermediate arrays. /// Returns `true` if the comparison was handled, `false` if fallback is needed. /// @@ -1989,7 +2040,6 @@ fn compare_rows_elementwise( right_array: &ArrayRef, null_equality: NullEquality, ) -> bool { - // Nested types need special comparison logic, fall back if left_array.data_type().is_nested() { return false; } @@ -2009,47 +2059,7 @@ fn compare_rows_elementwise( }}; } - match left_array.data_type() { - DataType::Null => { - match null_equality { - NullEquality::NullEqualsNothing => { - // null != null, clear all bits - for i in 0..left_indices.len() { - equal_bits.set_bit(i, false); - } - } - NullEquality::NullEqualsNull => {} // null == null, keep bits - } - } - DataType::Boolean => compare_elementwise!(BooleanArray), - DataType::Int8 => compare_elementwise!(Int8Array), - DataType::Int16 => compare_elementwise!(Int16Array), - DataType::Int32 => compare_elementwise!(Int32Array), - DataType::Int64 => compare_elementwise!(Int64Array), - DataType::UInt8 => compare_elementwise!(UInt8Array), - DataType::UInt16 => compare_elementwise!(UInt16Array), - DataType::UInt32 => compare_elementwise!(UInt32Array), - DataType::UInt64 => compare_elementwise!(UInt64Array), - DataType::Float32 => compare_elementwise!(Float32Array), - DataType::Float64 => compare_elementwise!(Float64Array), - DataType::Binary => compare_elementwise!(BinaryArray), - DataType::BinaryView => compare_elementwise!(BinaryViewArray), - DataType::FixedSizeBinary(_) => compare_elementwise!(FixedSizeBinaryArray), - DataType::LargeBinary => compare_elementwise!(LargeBinaryArray), - DataType::Utf8 => compare_elementwise!(StringArray), - DataType::Utf8View => compare_elementwise!(StringViewArray), - DataType::LargeUtf8 => compare_elementwise!(LargeStringArray), - DataType::Decimal128(..) => compare_elementwise!(Decimal128Array), - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => compare_elementwise!(TimestampSecondArray), - TimeUnit::Millisecond => compare_elementwise!(TimestampMillisecondArray), - TimeUnit::Microsecond => compare_elementwise!(TimestampMicrosecondArray), - TimeUnit::Nanosecond => compare_elementwise!(TimestampNanosecondArray), - }, - DataType::Date32 => compare_elementwise!(Date32Array), - DataType::Date64 => compare_elementwise!(Date64Array), - _ => return false, // Unsupported type, use fallback - } + dispatch_elementwise!(left_array.data_type(), equal_bits, left_indices, null_equality, compare_elementwise); true } @@ -2116,8 +2126,7 @@ fn compare_rows_elementwise_multi( equal_bits: &mut BooleanBufferBuilder, left_indices: &[u64], right_indices: &[u32], - left_arrays_per_batch: &[Vec], - key_idx: usize, + left_arrays: &[&ArrayRef], right_array: &ArrayRef, null_equality: NullEquality, ) -> bool { @@ -2127,9 +2136,9 @@ fn compare_rows_elementwise_multi( macro_rules! compare_multi { ($array_type:ty) => {{ - let left_typed: Vec<&$array_type> = left_arrays_per_batch + let left_typed: Vec<&$array_type> = left_arrays .iter() - .map(|keys| keys[key_idx].as_any().downcast_ref::<$array_type>().unwrap()) + .map(|a| a.as_any().downcast_ref::<$array_type>().unwrap()) .collect(); let right = right_array.as_any().downcast_ref::<$array_type>().unwrap(); do_compare_elementwise_multi( @@ -2143,46 +2152,7 @@ fn compare_rows_elementwise_multi( }}; } - match right_array.data_type() { - DataType::Null => { - match null_equality { - NullEquality::NullEqualsNothing => { - for i in 0..left_indices.len() { - equal_bits.set_bit(i, false); - } - } - NullEquality::NullEqualsNull => {} - } - } - DataType::Boolean => compare_multi!(BooleanArray), - DataType::Int8 => compare_multi!(Int8Array), - DataType::Int16 => compare_multi!(Int16Array), - DataType::Int32 => compare_multi!(Int32Array), - DataType::Int64 => compare_multi!(Int64Array), - DataType::UInt8 => compare_multi!(UInt8Array), - DataType::UInt16 => compare_multi!(UInt16Array), - DataType::UInt32 => compare_multi!(UInt32Array), - DataType::UInt64 => compare_multi!(UInt64Array), - DataType::Float32 => compare_multi!(Float32Array), - DataType::Float64 => compare_multi!(Float64Array), - DataType::Binary => compare_multi!(BinaryArray), - DataType::BinaryView => compare_multi!(BinaryViewArray), - DataType::FixedSizeBinary(_) => compare_multi!(FixedSizeBinaryArray), - DataType::LargeBinary => compare_multi!(LargeBinaryArray), - DataType::Utf8 => compare_multi!(StringArray), - DataType::Utf8View => compare_multi!(StringViewArray), - DataType::LargeUtf8 => compare_multi!(LargeStringArray), - DataType::Decimal128(..) => compare_multi!(Decimal128Array), - DataType::Timestamp(time_unit, None) => match time_unit { - TimeUnit::Second => compare_multi!(TimestampSecondArray), - TimeUnit::Millisecond => compare_multi!(TimestampMillisecondArray), - TimeUnit::Microsecond => compare_multi!(TimestampMicrosecondArray), - TimeUnit::Nanosecond => compare_multi!(TimestampNanosecondArray), - }, - DataType::Date32 => compare_multi!(Date32Array), - DataType::Date64 => compare_multi!(Date64Array), - _ => return false, - } + dispatch_elementwise!(right_array.data_type(), equal_bits, left_indices, null_equality, compare_multi); true } @@ -2222,6 +2192,10 @@ fn do_compare_elementwise_multi( } } } else { + // Pre-compute null buffers per batch to avoid repeated method calls in the loop + let left_nulls_per_batch: Vec> = + left_arrays.iter().map(|a| a.nulls()).collect(); + for i in 0..num_rows { if !equal_bits.get_bit(i) { continue; @@ -2230,14 +2204,16 @@ fn do_compare_elementwise_multi( let batch_idx = (packed >> 32) as usize; let row_idx = (packed & 0xFFFFFFFF) as usize; let r_idx = right_indices[i] as usize; - let left = &left_arrays[batch_idx]; - let l_null = left.nulls().is_some_and(|n| !n.is_valid(row_idx)); + let l_null = left_nulls_per_batch[batch_idx] + .is_some_and(|n| !n.is_valid(row_idx)); let r_null = right_nulls.is_some_and(|n| !n.is_valid(r_idx)); let is_equal = match (l_null, r_null) { (true, true) => null_equality == NullEquality::NullEqualsNull, (true, false) | (false, true) => false, - (false, false) => left.value(row_idx) == right.value(r_idx), + (false, false) => { + left_arrays[batch_idx].value(row_idx) == right.value(r_idx) + } }; if !is_equal { equal_bits.set_bit(i, false); From 37531f60a4777dbd08aa4025b0c99db5de5ea053 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 17 Mar 2026 10:38:09 +0100 Subject: [PATCH 26/26] Coalesce batches in CoalescePartitionsExec for downstream performance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When merging multiple partitions, CoalescePartitionsExec now coalesces small batches into larger ones (target_batch_size from session config). This benefits downstream operators like hash join whose build side receives fewer, larger batches — enabling the fast single-batch take() path instead of the slower multi-batch interleave() path in build_batch_from_indices. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-plan/src/coalesce_partitions.rs | 87 +++++++++++++++++-- 1 file changed, 78 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 5ea3589f22b3e..df8b53254d084 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -19,14 +19,17 @@ //! into a single partition use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::stream::{ObservedStream, RecordBatchReceiverStream}; use super::{ - DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, - Statistics, + DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; +use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; @@ -34,11 +37,15 @@ use crate::sort_pushdown::SortOrderPushdownResult; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; +use futures::ready; +use futures::stream::{Stream, StreamExt}; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. @@ -214,6 +221,8 @@ impl ExecutionPlan for CoalescePartitionsExec { let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); + let batch_size = context.session_config().batch_size(); + // use a stream that allows each sender to put in at // least one result in an attempt to maximize // parallelism. @@ -231,11 +240,23 @@ impl ExecutionPlan for CoalescePartitionsExec { } let stream = builder.build(); - Ok(Box::pin(ObservedStream::new( - stream, - baseline_metrics, - self.fetch, - ))) + // Coalesce small batches from multiple partitions into + // larger batches of target_batch_size. This improves + // downstream performance (e.g. hash join build side + // benefits from fewer, larger batches). + Ok(Box::pin(CoalescedStream { + input: Box::pin(ObservedStream::new( + stream, + baseline_metrics, + self.fetch, + )), + coalescer: LimitedBatchCoalescer::new( + self.schema(), + batch_size, + None, // fetch is already handled by ObservedStream + ), + completed: false, + })) } } } @@ -352,6 +373,55 @@ impl ExecutionPlan for CoalescePartitionsExec { } } +/// Stream that coalesces small batches into larger ones using +/// [`LimitedBatchCoalescer`]. +struct CoalescedStream { + input: SendableRecordBatchStream, + coalescer: LimitedBatchCoalescer, + completed: bool, +} + +impl Stream for CoalescedStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + if let Some(batch) = self.coalescer.next_completed_batch() { + return Poll::Ready(Some(Ok(batch))); + } + if self.completed { + return Poll::Ready(None); + } + let input_batch = ready!(self.input.poll_next_unpin(cx)); + match input_batch { + None => { + self.completed = true; + self.coalescer.finish()?; + } + Some(Ok(batch)) => { + match self.coalescer.push_batch(batch)? { + PushBatchStatus::Continue => {} + PushBatchStatus::LimitReached => { + self.completed = true; + self.coalescer.finish()?; + } + } + } + other => return Poll::Ready(other), + } + } + } +} + +impl RecordBatchStream for CoalescedStream { + fn schema(&self) -> SchemaRef { + self.coalescer.schema() + } +} + #[cfg(test)] mod tests { use super::*; @@ -383,10 +453,9 @@ mod tests { 1 ); - // the result should contain 4 batches (one per input partition) + // the result should contain all rows (coalesced into fewer batches) let iter = merge.execute(0, task_ctx)?; let batches = common::collect(iter).await?; - assert_eq!(batches.len(), num_partitions); // there should be a total of 400 rows (100 per each partition) let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();