diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 915bd56..b0aa0ec 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -785,6 +785,18 @@ struct ArrowFileReader { r: Box, } +/// coalesce threshold: 1 MiB. +const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// concurrent range fetches. +const RANGE_FETCH_CONCURRENCY: usize = 10; +/// metadata prefetch hint: 512 KiB. +const METADATA_SIZE_HINT: usize = 512 * 1024; +/// Minimum range size for splitting: 4 MiB. +/// The block size used for split alignment and as the minimum split +/// granularity. Ranges smaller than this will not be split further to +/// avoid excessive small IO requests whose per-request overhead dominates. +const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; + impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { Self { file_size, r } @@ -809,14 +821,128 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let coalesce_bytes = RANGE_COALESCE_BYTES; + let concurrency = RANGE_FETCH_CONCURRENCY; + + async move { + if ranges.is_empty() { + return Ok(vec![]); + } + + // Two-phase range optimization: + // Phase 1: Merge nearby ranges based on coalesce threshold. + let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); + // Phase 2: Split large merged ranges to utilize concurrency, + // but only at original range boundaries. + let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); + + // Fetch merged ranges concurrently. + let r = &self.r; + let fetched: Vec = if fetch_ranges.len() <= concurrency { + // All ranges fit within the concurrency limit — fire them all at once. + futures::future::try_join_all(fetch_ranges.iter().map(|range| { + r.read(range.clone()) + .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) + })) + .await? + } else { + // More ranges than concurrency slots — use buffered stream. + futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range).await.map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + }) + }) + .buffered(concurrency) + .try_collect() + .await? + }; + + // Slice the fetched data back into the originally requested + // ranges. A single original range may span multiple fetch + // chunks, so we copy from as many chunks as needed. + let result: parquet::errors::Result> = ranges + .iter() + .map(|range| { + // Find the first fetch chunk whose end is past range.start. + let first = fetch_ranges.partition_point(|v| v.end <= range.start); + if first >= fetch_ranges.len() { + return Err(parquet::errors::ParquetError::General(format!( + "No fetch range covers requested range {}..{}", + range.start, range.end + ))); + } + + let need = (range.end - range.start) as usize; + + // Fast path: the original range fits entirely within one + // fetch chunk — zero-copy slice. + let fr = &fetch_ranges[first]; + if range.end <= fr.end { + let start = (range.start - fr.start) as usize; + let end = (range.end - fr.start) as usize; + return Ok(fetched[first].slice(start..end)); + } + + // Slow path: the original range spans multiple fetch + // chunks — copy pieces into a new buffer (mirrors Java's + // copyMultiBytesToBytes). + let mut buf = Vec::with_capacity(need); + let mut pos = range.start; + for i in first..fetch_ranges.len() { + if pos >= range.end { + break; + } + let fr = &fetch_ranges[i]; + let chunk = &fetched[i]; + let src_start = (pos - fr.start) as usize; + let src_end = ((range.end.min(fr.end)) - fr.start) as usize; + if src_end > chunk.len() { + return Err(parquet::errors::ParquetError::General(format!( + "Fetched data too short for range {}..{}: \ + chunk {}..{} has {} bytes, need up to offset {}", + range.start, + range.end, + fr.start, + fr.end, + chunk.len(), + src_end, + ))); + } + buf.extend_from_slice(&chunk[src_start..src_end]); + pos = fr.end; + } + if buf.len() != need { + return Err(parquet::errors::ParquetError::General(format!( + "Assembled {} bytes for range {}..{}, expected {}", + buf.len(), + range.start, + range.end, + need, + ))); + } + Ok(Bytes::from(buf)) + }) + .collect(); + result + } + .boxed() + } + fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); + let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; @@ -825,6 +951,98 @@ impl AsyncFileReader for ArrowFileReader { } } +// --------------------------------------------------------------------------- +// Range coalescing +// --------------------------------------------------------------------------- + +/// Merge nearby byte ranges to reduce the number of requests. +/// +/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. +/// The input does not need to be sorted. +fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + +/// Split merged ranges into fixed-size batches to utilize concurrency, +/// Each merged range is divided into chunks of `expected_size`, +/// with the last chunk taking whatever remains. +/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to +/// avoid excessive small IO requests. +fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { + if merged.is_empty() || concurrency <= 1 { + return merged; + } + + let mut result = Vec::with_capacity(merged.len()); + + for range in &merged { + let length = range.end - range.start; + let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); + // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that + // every split boundary is 4 MB-aligned relative to the range start. + let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; + let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); + + let mut offset = range.start; + let end = range.end; + + // Align the first split boundary: if `offset` is not 4 MB-aligned, + // emit a short head chunk so that all subsequent chunks start on a + // 4 MB boundary. + let misalign = offset % IO_BLOCK_SIZE; + if misalign != 0 { + let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); + result.push(offset..first_end); + offset = first_end; + } + + loop { + if offset >= end { + break; + } + if end - offset < min_tail_size { + result.push(offset..end); + break; + } else { + result.push(offset..offset + expected_size); + offset += expected_size; + } + } + } + + result +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -878,4 +1096,149 @@ mod tests { assert!(row_filter.is_some()); } + + // ----------------------------------------------------------------------- + // merge_byte_ranges tests + // ----------------------------------------------------------------------- + + #[test] + fn test_merge_byte_ranges_empty() { + assert_eq!( + super::merge_byte_ranges(&[], 1024), + Vec::>::new() + ); + } + + #[test] + fn test_merge_byte_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_byte_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_byte_ranges_zero_coalesce_gap() { + // With coalesce=0, ranges with a 1-byte gap should NOT merge + let ranges = vec![0..100, 101..200]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..100, 101..200]); + } + + // ----------------------------------------------------------------------- + // split_ranges_for_concurrency tests + // ----------------------------------------------------------------------- + + #[test] + fn test_split_aligned_range_0_to_20mb() { + // 0..20MB, concurrency=4: + // raw_size = max(4MB, 5MB+1) = 5MB+1 + // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB + // min_tail_size = max(8MB, 8MB) = 8MB + // No misalign. Chunks: [0..8, 8..16, 16..20] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![0..20 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); + } + + #[test] + fn test_split_unaligned_start_6_to_14mb() { + // 6MB..14MB, concurrency=4: + // raw_size = max(4MB, 2MB+1) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. + // Result: [6..8, 8..14] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![6 * mb..14 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); + } + + #[test] + fn test_split_unaligned_start_6_to_22mb() { + // 6MB..22MB, concurrency=4: + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = ceil(4MB/4MB)*4MB = 4MB + // min_tail_size = max(4MB, 8MB) = 8MB + // Head: 6..8MB (misalign=2MB). + // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. + // Result: [6..8, 8..12, 12..16, 16..22] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![6 * mb..22 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 6 * mb..8 * mb, + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..22 * mb, + ] + ); + } + + #[test] + fn test_split_already_aligned_8_to_24mb() { + // 8MB..24MB, concurrency=4: + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // No misalign. + // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. + // Result: [8..12, 12..16, 16..20, 20..24] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![8 * mb..24 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..20 * mb, + 20 * mb..24 * mb, + ] + ); + } + + #[test] + fn test_split_multiple_ranges() { + // [0..20MB, 24..44MB], concurrency=4: + // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) + // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. + // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. + // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] + let mb = 1024 * 1024u64; + let merged = vec![0..20 * mb, 24 * mb..44 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 0..8 * mb, + 8 * mb..16 * mb, + 16 * mb..20 * mb, + 24 * mb..32 * mb, + 32 * mb..40 * mb, + 40 * mb..44 * mb, + ] + ); + } + + #[test] + fn test_split_empty() { + let merged: Vec> = vec![]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert!(result.is_empty()); + } }