From aa707fa8b4f12e4805f8ec2acbe02cca294471fa Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 8 Apr 2026 17:13:01 +0800 Subject: [PATCH 01/24] coalsce --- crates/paimon/src/arrow/format/parquet.rs | 173 +++++++++++++++++- .../src/catalog/rest/rest_token_file_io.rs | 10 +- 2 files changed, 172 insertions(+), 11 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 915bd565..8a216409 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -772,22 +772,28 @@ fn build_row_ranges_selection( /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. /// -/// # TODO +/// Supports range coalescing to reduce the number of object-store round-trips +/// when reading column chunks from remote storage. /// -/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) -/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct: -/// -/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. -/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. -/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. +/// Inspired by iceberg-rust's `ArrowFileReader` (PR #2181). struct ArrowFileReader { file_size: u64, r: Box, + /// Maximum gap (in bytes) between two ranges that will be merged into a + /// single fetch request. Defaults to 1 MiB. + range_coalesce_bytes: u64, } +/// Default coalesce threshold: 1 MiB. +const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; + impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { - Self { file_size, r } + Self { + file_size, + r, + range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + } } fn read_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { @@ -809,6 +815,48 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let coalesce_bytes = self.range_coalesce_bytes; + + async move { + if ranges.is_empty() { + return Ok(vec![]); + } + + // Merge nearby ranges to reduce the number of object-store requests. + let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes); + + // Fetch merged ranges sequentially (FileRead is !Sync so we cannot + // use buffered concurrency on &mut self). The coalescing itself is + // the main win — it turns N small requests into M merged requests + // where M << N for typical column-chunk access patterns. + let mut fetched: Vec = Vec::with_capacity(fetch_ranges.len()); + for range in &fetch_ranges { + let bytes = self.r.read(range.clone()).await.map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + })?; + fetched.push(bytes); + } + + // Slice the fetched data back into the originally requested ranges. + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + let start = (range.start - fetch_range.start) as usize; + let end = (range.end - fetch_range.start) as usize; + fetch_bytes.slice(start..end.min(fetch_bytes.len())) + }) + .collect()) + } + .boxed() + } + fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, @@ -825,6 +873,48 @@ impl AsyncFileReader for ArrowFileReader { } } +// --------------------------------------------------------------------------- +// Range coalescing +// --------------------------------------------------------------------------- + +/// Merge nearby byte ranges to reduce the number of object-store requests. +/// +/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. +/// The input does not need to be sorted. +fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -878,4 +968,69 @@ mod tests { assert!(row_filter.is_some()); } + + // ----------------------------------------------------------------------- + // merge_byte_ranges tests + // ----------------------------------------------------------------------- + + #[test] + fn test_merge_byte_ranges_empty() { + assert_eq!( + super::merge_byte_ranges(&[], 1024), + Vec::>::new() + ); + } + + #[test] + fn test_merge_byte_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_byte_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_byte_ranges_overlapping() { + let ranges = vec![0..200, 100..300]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..300]); + } + + #[test] + fn test_merge_byte_ranges_unsorted() { + let ranges = vec![500..600, 0..100, 200..300]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_byte_ranges_single() { + let ranges = vec![100..200]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![100..200]); + } + + #[test] + fn test_merge_byte_ranges_zero_coalesce_adjacent() { + // With coalesce=0, adjacent ranges (gap=0) should still merge + let ranges = vec![0..100, 100..200]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..200]); + } + + #[test] + fn test_merge_byte_ranges_zero_coalesce_gap() { + // With coalesce=0, ranges with a 1-byte gap should NOT merge + let ranges = vec![0..100, 101..200]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..100, 101..200]); + } } diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 6233eb10..9e7a5a5d 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -93,8 +93,14 @@ impl RESTTokenFileIO { match token_guard.as_ref() { Some(token) => { // Merge catalog options (base) with token credentials (override) - let merged_props = - RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token)); + // token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com + let mut token_with_endpoint = token.token.clone(); + token_with_endpoint.insert( + "fs.oss.endpoint".to_string(), + "oss-cn-hangzhou.aliyuncs.com".to_string(), + ); + let base = self.catalog_options.to_map().clone(); + let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint)); // Build FileIO with merged properties let mut builder = FileIO::from_path(&self.path)?; builder = builder.with_props(merged_props); From ccc822701757d28a71ed22e48e248b24073fd375 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 8 Apr 2026 17:24:25 +0800 Subject: [PATCH 02/24] prefetch --- crates/paimon/src/arrow/format/parquet.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 8a216409..f6a6517c 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -782,10 +782,16 @@ struct ArrowFileReader { /// Maximum gap (in bytes) between two ranges that will be merged into a /// single fetch request. Defaults to 1 MiB. range_coalesce_bytes: u64, + /// Hint for the number of bytes to speculatively read from the end of the + /// file when loading Parquet metadata. A sufficiently large hint reduces + /// footer loading from 2 round-trips to 1. Defaults to 512 KiB. + metadata_size_hint: Option, } /// Default coalesce threshold: 1 MiB. const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// Default metadata prefetch hint: 512 KiB (same as DataFusion's default). +const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { @@ -793,6 +799,7 @@ impl ArrowFileReader { file_size, r, range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + metadata_size_hint: Some(DEFAULT_METADATA_SIZE_HINT), } } @@ -862,9 +869,11 @@ impl AsyncFileReader for ArrowFileReader { options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); + let prefetch_hint = self.metadata_size_hint; Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; From ff8d266dd53693d999368f0869f28cecb06500ed Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 8 Apr 2026 19:45:07 +0800 Subject: [PATCH 03/24] paraReadBytes --- crates/paimon/src/arrow/format/parquet.rs | 38 +++++++++++++++-------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index f6a6517c..36108542 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -782,6 +782,8 @@ struct ArrowFileReader { /// Maximum gap (in bytes) between two ranges that will be merged into a /// single fetch request. Defaults to 1 MiB. range_coalesce_bytes: u64, + /// Maximum number of merged ranges to fetch concurrently. Defaults to 8. + range_fetch_concurrency: usize, /// Hint for the number of bytes to speculatively read from the end of the /// file when loading Parquet metadata. A sufficiently large hint reduces /// footer loading from 2 round-trips to 1. Defaults to 512 KiB. @@ -790,6 +792,8 @@ struct ArrowFileReader { /// Default coalesce threshold: 1 MiB. const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// Default concurrent range fetches. +const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 8; /// Default metadata prefetch hint: 512 KiB (same as DataFusion's default). const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; @@ -799,6 +803,7 @@ impl ArrowFileReader { file_size, r, range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, metadata_size_hint: Some(DEFAULT_METADATA_SIZE_HINT), } } @@ -827,6 +832,7 @@ impl AsyncFileReader for ArrowFileReader { ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { let coalesce_bytes = self.range_coalesce_bytes; + let concurrency = self.range_fetch_concurrency.max(1); async move { if ranges.is_empty() { @@ -835,18 +841,24 @@ impl AsyncFileReader for ArrowFileReader { // Merge nearby ranges to reduce the number of object-store requests. let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes); - - // Fetch merged ranges sequentially (FileRead is !Sync so we cannot - // use buffered concurrency on &mut self). The coalescing itself is - // the main win — it turns N small requests into M merged requests - // where M << N for typical column-chunk access patterns. - let mut fetched: Vec = Vec::with_capacity(fetch_ranges.len()); - for range in &fetch_ranges { - let bytes = self.r.read(range.clone()).await.map_err(|e| { - parquet::errors::ParquetError::External(format!("{e}").into()) - })?; - fetched.push(bytes); - } + let r = &self.r; + + eprintln!( + "[get_byte_ranges] original={}, merged={}", + ranges.len(), + fetch_ranges.len() + ); + + // Fetch merged ranges concurrently. + let fetched: Vec = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range) + .await + .map_err(|e| parquet::errors::ParquetError::External(Box::new(e))) + }) + .buffered(concurrency) + .try_collect() + .await?; // Slice the fetched data back into the originally requested ranges. Ok(ranges From 9de32da9908d50b00ddb68c8734f9606f6481d73 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 8 Apr 2026 21:03:48 +0800 Subject: [PATCH 04/24] paraReadBytes2 --- crates/paimon/src/arrow/format/parquet.rs | 94 ++++++++++++----------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 36108542..886ca659 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -828,8 +828,8 @@ impl AsyncFileReader for ArrowFileReader { } fn get_byte_ranges( - &mut self, - ranges: Vec>, + &mut self, + ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { let coalesce_bytes = self.range_coalesce_bytes; let concurrency = self.range_fetch_concurrency.max(1); @@ -839,22 +839,27 @@ impl AsyncFileReader for ArrowFileReader { return Ok(vec![]); } - // Merge nearby ranges to reduce the number of object-store requests. - let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes); - let r = &self.r; + // Calculate max merged range size to ensure enough ranges for concurrency. + // For column-pruned reads, ranges are naturally spread out so this has no effect. + // For full-table reads, this prevents everything from merging into 1 huge range. + let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let max_merge_bytes = if concurrency > 1 { + (total_bytes / concurrency as u64).max(1) + } else { + u64::MAX + }; - eprintln!( - "[get_byte_ranges] original={}, merged={}", - ranges.len(), - fetch_ranges.len() - ); + let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes); // Fetch merged ranges concurrently. + // NOTE: requires FileRead to be Sync. If FileRead is !Sync, either + // add Sync bound or fall back to the sequential loop below. + let r = &self.r; let fetched: Vec = futures::stream::iter(fetch_ranges.iter().cloned()) .map(|range| async move { r.read(range) .await - .map_err(|e| parquet::errors::ParquetError::External(Box::new(e))) + .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) }) .buffered(concurrency) .try_collect() @@ -902,39 +907,40 @@ impl AsyncFileReader for ArrowFileReader { /// /// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. /// The input does not need to be sorted. -fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut sorted = ranges.to_vec(); - sorted.sort_unstable_by_key(|r| r.start); - - let mut merged = Vec::with_capacity(sorted.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != sorted.len() { - let mut range_end = sorted[start_idx].end; - - while end_idx != sorted.len() - && sorted[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - { - range_end = range_end.max(sorted[end_idx].end); - end_idx += 1; - } - - merged.push(sorted[start_idx].start..range_end); - start_idx = end_idx; - end_idx += 1; - } - - merged -} +fn merge_byte_ranges(ranges: &[Range], coalesce: u64, max_merge_bytes: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + && (sorted[end_idx].end - sorted[start_idx].start) <= max_merge_bytes + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged + } // --------------------------------------------------------------------------- // Tests From ba47ac977c81cdf24be4070426d2cd1f38b91464 Mon Sep 17 00:00:00 2001 From: umi Date: Wed, 8 Apr 2026 21:11:02 +0800 Subject: [PATCH 05/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 31 +++++++++++++++-------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 886ca659..e80ff1eb 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -852,18 +852,27 @@ impl AsyncFileReader for ArrowFileReader { let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes); // Fetch merged ranges concurrently. - // NOTE: requires FileRead to be Sync. If FileRead is !Sync, either - // add Sync bound or fall back to the sequential loop below. let r = &self.r; - let fetched: Vec = futures::stream::iter(fetch_ranges.iter().cloned()) - .map(|range| async move { - r.read(range) - .await - .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) - }) - .buffered(concurrency) - .try_collect() - .await?; + let fetched: Vec = if fetch_ranges.len() <= concurrency { + // All ranges fit within the concurrency limit — fire them all at once. + futures::future::try_join_all(fetch_ranges.iter().map(|range| { + r.read(range.clone()).map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + }) + })) + .await? + } else { + // More ranges than concurrency slots — use buffered stream. + futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range).await.map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + }) + }) + .buffered(concurrency) + .try_collect() + .await? + }; // Slice the fetched data back into the originally requested ranges. Ok(ranges From 317f7d71a7265a35285400182b9a766200f1e53f Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 00:36:47 +0800 Subject: [PATCH 06/24] averPara --- crates/paimon/src/arrow/format/parquet.rs | 192 ++++++++++++------ .../src/catalog/rest/rest_token_file_io.rs | 10 +- 2 files changed, 132 insertions(+), 70 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index e80ff1eb..2af72020 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -772,10 +772,14 @@ fn build_row_ranges_selection( /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. /// -/// Supports range coalescing to reduce the number of object-store round-trips -/// when reading column chunks from remote storage. +/// # TODO /// -/// Inspired by iceberg-rust's `ArrowFileReader` (PR #2181). +/// [ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64) +/// contains the following hints to speed up metadata loading, similar to iceberg, we can consider adding them to this struct: +/// +/// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. +/// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. +/// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. struct ArrowFileReader { file_size: u64, r: Box, @@ -794,7 +798,7 @@ struct ArrowFileReader { const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; /// Default concurrent range fetches. const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 8; -/// Default metadata prefetch hint: 512 KiB (same as DataFusion's default). +/// Default metadata prefetch hint: 512 KiB. const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; impl ArrowFileReader { @@ -828,8 +832,8 @@ impl AsyncFileReader for ArrowFileReader { } fn get_byte_ranges( - &mut self, - ranges: Vec>, + &mut self, + ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { let coalesce_bytes = self.range_coalesce_bytes; let concurrency = self.range_fetch_concurrency.max(1); @@ -839,26 +843,19 @@ impl AsyncFileReader for ArrowFileReader { return Ok(vec![]); } - // Calculate max merged range size to ensure enough ranges for concurrency. - // For column-pruned reads, ranges are naturally spread out so this has no effect. - // For full-table reads, this prevents everything from merging into 1 huge range. - let total_bytes: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - let max_merge_bytes = if concurrency > 1 { - (total_bytes / concurrency as u64).max(1) - } else { - u64::MAX - }; - - let fetch_ranges = merge_byte_ranges(&ranges, coalesce_bytes, max_merge_bytes); + // Two-phase range optimization: + // Phase 1: Merge nearby ranges based on coalesce threshold. + let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); + // Phase 2: Split large merged ranges to utilize concurrency. + let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); // Fetch merged ranges concurrently. let r = &self.r; let fetched: Vec = if fetch_ranges.len() <= concurrency { // All ranges fit within the concurrency limit — fire them all at once. futures::future::try_join_all(fetch_ranges.iter().map(|range| { - r.read(range.clone()).map_err(|e| { - parquet::errors::ParquetError::External(format!("{e}").into()) - }) + r.read(range.clone()) + .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) })) .await? } else { @@ -912,44 +909,74 @@ impl AsyncFileReader for ArrowFileReader { // Range coalescing // --------------------------------------------------------------------------- -/// Merge nearby byte ranges to reduce the number of object-store requests. +/// Merge nearby byte ranges to reduce the number of requests. /// /// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. /// The input does not need to be sorted. -fn merge_byte_ranges(ranges: &[Range], coalesce: u64, max_merge_bytes: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut sorted = ranges.to_vec(); - sorted.sort_unstable_by_key(|r| r.start); - - let mut merged = Vec::with_capacity(sorted.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != sorted.len() { - let mut range_end = sorted[start_idx].end; - - while end_idx != sorted.len() - && sorted[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - && (sorted[end_idx].end - sorted[start_idx].start) <= max_merge_bytes - { - range_end = range_end.max(sorted[end_idx].end); - end_idx += 1; - } - - merged.push(sorted[start_idx].start..range_end); - start_idx = end_idx; - end_idx += 1; - } - - merged - } +fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + +fn split_ranges_for_concurrency(ranges: Vec>, target_count: usize) -> Vec> { + if ranges.is_empty() || target_count <= 1 || ranges.len() >= target_count { + return ranges; + } + + let mut result = ranges; + + while result.len() < target_count { + // Find the largest range by byte size. + let (largest_idx, largest_range) = result + .iter() + .enumerate() + .max_by_key(|(_, r)| r.end - r.start) + .expect("result is non-empty"); + + let range_size = largest_range.end - largest_range.start; + if range_size <= 1 { + break; + } + + let mid = largest_range.start + range_size / 2; + let left = largest_range.start..mid; + let right = mid..largest_range.end; + + result[largest_idx] = left; + result.insert(largest_idx + 1, right); + } + + result +} // --------------------------------------------------------------------------- // Tests @@ -1047,13 +1074,6 @@ mod tests { assert_eq!(merged, vec![0..600]); } - #[test] - fn test_merge_byte_ranges_single() { - let ranges = vec![100..200]; - let merged = super::merge_byte_ranges(&ranges, 1024); - assert_eq!(merged, vec![100..200]); - } - #[test] fn test_merge_byte_ranges_zero_coalesce_adjacent() { // With coalesce=0, adjacent ranges (gap=0) should still merge @@ -1069,4 +1089,52 @@ mod tests { let merged = super::merge_byte_ranges(&ranges, 0); assert_eq!(merged, vec![0..100, 101..200]); } + + // ----------------------------------------------------------------------- + // split_ranges_for_concurrency tests + // ----------------------------------------------------------------------- + + #[test] + fn test_split_single_range() { + // One large range split into 4 + let ranges = vec![0..1000]; + let result = super::split_ranges_for_concurrency(ranges, 4); + assert_eq!(result.len(), 4); + // All ranges should be contiguous and cover 0..1000 + assert_eq!(result[0].start, 0); + assert_eq!(result.last().unwrap().end, 1000); + for window in result.windows(2) { + assert_eq!(window[0].end, window[1].start); + } + } + + #[test] + fn test_split_mixed_sizes() { + // One large range + one small range, target=4 + // Should split the large range, leave the small one alone + let ranges = vec![0..1000, 2000..2010]; + let result = super::split_ranges_for_concurrency(ranges, 4); + assert_eq!(result.len(), 4); + // The small range (2000..2010) should remain intact + assert!(result.contains(&(2000..2010))); + } + + #[test] + fn test_split_empty() { + let ranges: Vec> = vec![]; + let result = super::split_ranges_for_concurrency(ranges, 4); + assert!(result.is_empty()); + } + + #[test] + fn test_split_clustered_and_sparse() { + // Simulates clustered + sparse columns: + // Clustered group merged into 0..400, sparse columns at 1000 and 2000 + let ranges = vec![0..400, 1000..1010, 2000..2010]; + let result = super::split_ranges_for_concurrency(ranges, 4); + assert_eq!(result.len(), 4); + // The large range should be split, small ones preserved + assert!(result.contains(&(1000..1010))); + assert!(result.contains(&(2000..2010))); + } } diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 9e7a5a5d..6233eb10 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -93,14 +93,8 @@ impl RESTTokenFileIO { match token_guard.as_ref() { Some(token) => { // Merge catalog options (base) with token credentials (override) - // token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com - let mut token_with_endpoint = token.token.clone(); - token_with_endpoint.insert( - "fs.oss.endpoint".to_string(), - "oss-cn-hangzhou.aliyuncs.com".to_string(), - ); - let base = self.catalog_options.to_map().clone(); - let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint)); + let merged_props = + RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token)); // Build FileIO with merged properties let mut builder = FileIO::from_path(&self.path)?; builder = builder.with_props(merged_props); From 3f10d8188dcf0d3f68dd94638d82ba7debce3ace Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 10:40:58 +0800 Subject: [PATCH 07/24] fmt --- crates/paimon/src/arrow/format/parquet.rs | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 2af72020..2964f515 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -1060,28 +1060,6 @@ mod tests { assert_eq!(merged, vec![0..600]); } - #[test] - fn test_merge_byte_ranges_overlapping() { - let ranges = vec![0..200, 100..300]; - let merged = super::merge_byte_ranges(&ranges, 0); - assert_eq!(merged, vec![0..300]); - } - - #[test] - fn test_merge_byte_ranges_unsorted() { - let ranges = vec![500..600, 0..100, 200..300]; - let merged = super::merge_byte_ranges(&ranges, 1024); - assert_eq!(merged, vec![0..600]); - } - - #[test] - fn test_merge_byte_ranges_zero_coalesce_adjacent() { - // With coalesce=0, adjacent ranges (gap=0) should still merge - let ranges = vec![0..100, 100..200]; - let merged = super::merge_byte_ranges(&ranges, 0); - assert_eq!(merged, vec![0..200]); - } - #[test] fn test_merge_byte_ranges_zero_coalesce_gap() { // With coalesce=0, ranges with a 1-byte gap should NOT merge @@ -1097,6 +1075,7 @@ mod tests { #[test] fn test_split_single_range() { // One large range split into 4 + #[allow(clippy::single_range_in_vec_init)] let ranges = vec![0..1000]; let result = super::split_ranges_for_concurrency(ranges, 4); assert_eq!(result.len(), 4); From e160e7ad60bd3d8779048cbd7a48f2b598b889fa Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 11:44:28 +0800 Subject: [PATCH 08/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 121 +++++++++++++++------- 1 file changed, 85 insertions(+), 36 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 2964f515..ba405602 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -846,8 +846,9 @@ impl AsyncFileReader for ArrowFileReader { // Two-phase range optimization: // Phase 1: Merge nearby ranges based on coalesce threshold. let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); - // Phase 2: Split large merged ranges to utilize concurrency. - let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); + // Phase 2: Split large merged ranges to utilize concurrency, + // but only at original range boundaries. + let fetch_ranges = split_ranges_for_concurrency(coalesced, &ranges, concurrency); // Fetch merged ranges concurrently. let r = &self.r; @@ -947,32 +948,51 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { merged } -fn split_ranges_for_concurrency(ranges: Vec>, target_count: usize) -> Vec> { - if ranges.is_empty() || target_count <= 1 || ranges.len() >= target_count { - return ranges; +/// Split merged ranges to utilize concurrency by repeatedly bisecting the +/// largest range at the nearest original-range boundary. This guarantees +/// every original range stays fully inside one fetch range. +fn split_ranges_for_concurrency( + merged: Vec>, + original: &[Range], + target_count: usize, +) -> Vec> { + if merged.is_empty() || target_count <= 1 || merged.len() >= target_count { + return merged; } - let mut result = ranges; + // Collect all original-range start points as candidate split boundaries. + let mut boundaries: Vec = original.iter().map(|r| r.start).collect(); + boundaries.sort_unstable(); + boundaries.dedup(); + + let mut result = merged; while result.len() < target_count { - // Find the largest range by byte size. - let (largest_idx, largest_range) = result + // Pick the largest range. + let (idx, _) = result .iter() .enumerate() .max_by_key(|(_, r)| r.end - r.start) - .expect("result is non-empty"); + .unwrap(); - let range_size = largest_range.end - largest_range.start; - if range_size <= 1 { - break; - } + let range = &result[idx]; + let mid = range.start + (range.end - range.start) / 2; + + // Find the boundary closest to the midpoint that actually splits. + let best = boundaries + .iter() + .copied() + .filter(|&b| b > range.start && b < range.end) + .min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs()); - let mid = largest_range.start + range_size / 2; - let left = largest_range.start..mid; - let right = mid..largest_range.end; + let Some(split_at) = best else { + break; // No valid split point in the largest range; stop. + }; - result[largest_idx] = left; - result.insert(largest_idx + 1, right); + let left = range.start..split_at; + let right = split_at..range.end; + result[idx] = left; + result.insert(idx + 1, right); } result @@ -1074,46 +1094,75 @@ mod tests { #[test] fn test_split_single_range() { - // One large range split into 4 + // One merged range from a single original — no boundary to split at. #[allow(clippy::single_range_in_vec_init)] - let ranges = vec![0..1000]; - let result = super::split_ranges_for_concurrency(ranges, 4); + let merged = vec![0..1000]; + let original = vec![0..1000]; + let result = super::split_ranges_for_concurrency(merged, &original, 4); + assert_eq!(result.len(), 1); + assert_eq!(result[0], 0..1000); + } + + #[test] + fn test_split_single_range_multiple_originals() { + // One merged range containing 4 originals — bisect at boundaries. + let original = vec![0..200, 250..500, 550..750, 800..1000]; + let merged = vec![0..1000]; + let result = super::split_ranges_for_concurrency(merged, &original, 4); assert_eq!(result.len(), 4); - // All ranges should be contiguous and cover 0..1000 assert_eq!(result[0].start, 0); assert_eq!(result.last().unwrap().end, 1000); for window in result.windows(2) { assert_eq!(window[0].end, window[1].start); } + for orig in &original { + assert!( + result + .iter() + .any(|r| r.start <= orig.start && r.end >= orig.end), + "original {orig:?} not fully contained" + ); + } } #[test] fn test_split_mixed_sizes() { - // One large range + one small range, target=4 - // Should split the large range, leave the small one alone - let ranges = vec![0..1000, 2000..2010]; - let result = super::split_ranges_for_concurrency(ranges, 4); - assert_eq!(result.len(), 4); - // The small range (2000..2010) should remain intact + let original = vec![0..300, 400..700, 800..1000, 2000..2010]; + let merged = vec![0..1000, 2000..2010]; + let result = super::split_ranges_for_concurrency(merged, &original, 4); assert!(result.contains(&(2000..2010))); + for orig in &original { + assert!( + result + .iter() + .any(|r| r.start <= orig.start && r.end >= orig.end), + "original {orig:?} not fully contained" + ); + } } #[test] fn test_split_empty() { - let ranges: Vec> = vec![]; - let result = super::split_ranges_for_concurrency(ranges, 4); + let merged: Vec> = vec![]; + let original: Vec> = vec![]; + let result = super::split_ranges_for_concurrency(merged, &original, 4); assert!(result.is_empty()); } #[test] fn test_split_clustered_and_sparse() { - // Simulates clustered + sparse columns: - // Clustered group merged into 0..400, sparse columns at 1000 and 2000 - let ranges = vec![0..400, 1000..1010, 2000..2010]; - let result = super::split_ranges_for_concurrency(ranges, 4); - assert_eq!(result.len(), 4); - // The large range should be split, small ones preserved + let original = vec![0..100, 150..250, 300..400, 1000..1010, 2000..2010]; + let merged = vec![0..400, 1000..1010, 2000..2010]; + let result = super::split_ranges_for_concurrency(merged, &original, 5); assert!(result.contains(&(1000..1010))); assert!(result.contains(&(2000..2010))); + for orig in &original { + assert!( + result + .iter() + .any(|r| r.start <= orig.start && r.end >= orig.end), + "original {orig:?} not fully contained" + ); + } } } From 637163d1e33b1b6d9a91e6c44a7092a070c01cf7 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 11:56:18 +0800 Subject: [PATCH 09/24] fmt --- crates/paimon/src/arrow/format/parquet.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index ba405602..fbb559e6 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -1097,6 +1097,7 @@ mod tests { // One merged range from a single original — no boundary to split at. #[allow(clippy::single_range_in_vec_init)] let merged = vec![0..1000]; + #[allow(clippy::single_range_in_vec_init)] let original = vec![0..1000]; let result = super::split_ranges_for_concurrency(merged, &original, 4); assert_eq!(result.len(), 1); @@ -1107,6 +1108,7 @@ mod tests { fn test_split_single_range_multiple_originals() { // One merged range containing 4 originals — bisect at boundaries. let original = vec![0..200, 250..500, 550..750, 800..1000]; + #[allow(clippy::single_range_in_vec_init)] let merged = vec![0..1000]; let result = super::split_ranges_for_concurrency(merged, &original, 4); assert_eq!(result.len(), 4); From 1a89cba01d313328f4b38ad1ac25ab41daa3c18b Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 13:35:06 +0800 Subject: [PATCH 10/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 29 ++++++----------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index fbb559e6..be843ccb 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -783,33 +783,18 @@ fn build_row_ranges_selection( struct ArrowFileReader { file_size: u64, r: Box, - /// Maximum gap (in bytes) between two ranges that will be merged into a - /// single fetch request. Defaults to 1 MiB. - range_coalesce_bytes: u64, - /// Maximum number of merged ranges to fetch concurrently. Defaults to 8. - range_fetch_concurrency: usize, - /// Hint for the number of bytes to speculatively read from the end of the - /// file when loading Parquet metadata. A sufficiently large hint reduces - /// footer loading from 2 round-trips to 1. Defaults to 512 KiB. - metadata_size_hint: Option, } /// Default coalesce threshold: 1 MiB. -const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; /// Default concurrent range fetches. -const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 8; +const RANGE_FETCH_CONCURRENCY: usize = 8; /// Default metadata prefetch hint: 512 KiB. -const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; +const METADATA_SIZE_HINT: usize = 512 * 1024; impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { - Self { - file_size, - r, - range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, - range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, - metadata_size_hint: Some(DEFAULT_METADATA_SIZE_HINT), - } + Self { file_size, r } } fn read_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { @@ -835,8 +820,8 @@ impl AsyncFileReader for ArrowFileReader { &mut self, ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { - let coalesce_bytes = self.range_coalesce_bytes; - let concurrency = self.range_fetch_concurrency.max(1); + let coalesce_bytes = RANGE_COALESCE_BYTES; + let concurrency = RANGE_FETCH_CONCURRENCY; async move { if ranges.is_empty() { @@ -893,7 +878,7 @@ impl AsyncFileReader for ArrowFileReader { options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); - let prefetch_hint = self.metadata_size_hint; + let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() From 69de7f4df47940a6bc18966ab7489ac161ba396c Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 14:33:30 +0800 Subject: [PATCH 11/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 74 +++++++++++++---------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index be843ccb..3933b0e8 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -791,6 +791,11 @@ const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; const RANGE_FETCH_CONCURRENCY: usize = 8; /// Default metadata prefetch hint: 512 KiB. const METADATA_SIZE_HINT: usize = 512 * 1024; +/// Minimum range size for splitting: 4 MiB. +/// Matches Java Paimon's `batchSizeForVectorReads` default. +/// Ranges smaller than this will not be split further to avoid +/// excessive small IO requests whose per-request overhead dominates. +const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024; impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { @@ -858,17 +863,36 @@ impl AsyncFileReader for ArrowFileReader { }; // Slice the fetched data back into the originally requested ranges. - Ok(ranges + let result: parquet::errors::Result> = ranges .iter() .map(|range| { - let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let pp = fetch_ranges.partition_point(|v| v.start <= range.start); + let idx = pp.checked_sub(1).ok_or_else(|| { + parquet::errors::ParquetError::General(format!( + "No fetch range covers requested range {}..{}", + range.start, range.end + )) + })?; let fetch_range = &fetch_ranges[idx]; let fetch_bytes = &fetched[idx]; let start = (range.start - fetch_range.start) as usize; let end = (range.end - fetch_range.start) as usize; - fetch_bytes.slice(start..end.min(fetch_bytes.len())) + if end > fetch_bytes.len() { + return Err(parquet::errors::ParquetError::General(format!( + "Fetched data too short for range {}..{}: \ + expected at least {} bytes from fetch range {}..{}, got {}", + range.start, + range.end, + end, + fetch_range.start, + fetch_range.end, + fetch_bytes.len() + ))); + } + Ok(fetch_bytes.slice(start..end)) }) - .collect()) + .collect(); + result } .boxed() } @@ -954,24 +978,35 @@ fn split_ranges_for_concurrency( while result.len() < target_count { // Pick the largest range. - let (idx, _) = result + let (idx, largest) = result .iter() .enumerate() .max_by_key(|(_, r)| r.end - r.start) .unwrap(); + let largest_size = largest.end - largest.start; + + // Don't split if the range is smaller than 2 * MIN_SPLIT_SIZE, + // because both halves would end up below the batch threshold. + if largest_size < MIN_SPLIT_SIZE * 2 { + break; + } + let range = &result[idx]; + // Each half must be at least MIN_SPLIT_SIZE. + let expected_size = MIN_SPLIT_SIZE.max(largest_size / target_count as u64 + 1); let mid = range.start + (range.end - range.start) / 2; - // Find the boundary closest to the midpoint that actually splits. let best = boundaries .iter() .copied() - .filter(|&b| b > range.start && b < range.end) + .filter(|&b| { + b >= range.start + expected_size && b <= range.end.saturating_sub(expected_size) + }) .min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs()); let Some(split_at) = best else { - break; // No valid split point in the largest range; stop. + break; // No valid split point that keeps both halves large enough. }; let left = range.start..split_at; @@ -1089,29 +1124,6 @@ mod tests { assert_eq!(result[0], 0..1000); } - #[test] - fn test_split_single_range_multiple_originals() { - // One merged range containing 4 originals — bisect at boundaries. - let original = vec![0..200, 250..500, 550..750, 800..1000]; - #[allow(clippy::single_range_in_vec_init)] - let merged = vec![0..1000]; - let result = super::split_ranges_for_concurrency(merged, &original, 4); - assert_eq!(result.len(), 4); - assert_eq!(result[0].start, 0); - assert_eq!(result.last().unwrap().end, 1000); - for window in result.windows(2) { - assert_eq!(window[0].end, window[1].start); - } - for orig in &original { - assert!( - result - .iter() - .any(|r| r.start <= orig.start && r.end >= orig.end), - "original {orig:?} not fully contained" - ); - } - } - #[test] fn test_split_mixed_sizes() { let original = vec![0..300, 400..700, 800..1000, 2000..2010]; From 8b6597a36b939cb8255852d7c4771a4939f48aad Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 14:36:00 +0800 Subject: [PATCH 12/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 3933b0e8..95b65dac 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -792,7 +792,6 @@ const RANGE_FETCH_CONCURRENCY: usize = 8; /// Default metadata prefetch hint: 512 KiB. const METADATA_SIZE_HINT: usize = 512 * 1024; /// Minimum range size for splitting: 4 MiB. -/// Matches Java Paimon's `batchSizeForVectorReads` default. /// Ranges smaller than this will not be split further to avoid /// excessive small IO requests whose per-request overhead dominates. const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024; From d6e440ffba9b61af66625bdd769d2fa195e6c433 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 14:37:06 +0800 Subject: [PATCH 13/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 95b65dac..905777e7 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -788,7 +788,7 @@ struct ArrowFileReader { /// Default coalesce threshold: 1 MiB. const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; /// Default concurrent range fetches. -const RANGE_FETCH_CONCURRENCY: usize = 8; +const RANGE_FETCH_CONCURRENCY: usize = 10; /// Default metadata prefetch hint: 512 KiB. const METADATA_SIZE_HINT: usize = 512 * 1024; /// Minimum range size for splitting: 4 MiB. From 01495100cf3a9d8c2eb605cea0c7652d6a60df2f Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 14:37:49 +0800 Subject: [PATCH 14/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 905777e7..51451231 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -785,11 +785,11 @@ struct ArrowFileReader { r: Box, } -/// Default coalesce threshold: 1 MiB. +/// coalesce threshold: 1 MiB. const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; -/// Default concurrent range fetches. +/// concurrent range fetches. const RANGE_FETCH_CONCURRENCY: usize = 10; -/// Default metadata prefetch hint: 512 KiB. +/// metadata prefetch hint: 512 KiB. const METADATA_SIZE_HINT: usize = 512 * 1024; /// Minimum range size for splitting: 4 MiB. /// Ranges smaller than this will not be split further to avoid From 66b8fa881c7849af9dc30e4497fa6aca0db84e1d Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 17:04:29 +0800 Subject: [PATCH 15/24] fixSplit --- crates/paimon/src/arrow/format/parquet.rs | 180 ++++++++++------------ 1 file changed, 85 insertions(+), 95 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 51451231..c189f4be 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -861,34 +861,71 @@ impl AsyncFileReader for ArrowFileReader { .await? }; - // Slice the fetched data back into the originally requested ranges. + // Slice the fetched data back into the originally requested + // ranges. A single original range may span multiple fetch + // chunks (the Java `copyMultiBytesToBytes` approach), so we + // copy from as many chunks as needed. let result: parquet::errors::Result> = ranges .iter() .map(|range| { - let pp = fetch_ranges.partition_point(|v| v.start <= range.start); - let idx = pp.checked_sub(1).ok_or_else(|| { - parquet::errors::ParquetError::General(format!( + // Find the first fetch chunk whose end is past range.start. + let first = fetch_ranges.partition_point(|v| v.end <= range.start); + if first >= fetch_ranges.len() { + return Err(parquet::errors::ParquetError::General(format!( "No fetch range covers requested range {}..{}", range.start, range.end - )) - })?; - let fetch_range = &fetch_ranges[idx]; - let fetch_bytes = &fetched[idx]; - let start = (range.start - fetch_range.start) as usize; - let end = (range.end - fetch_range.start) as usize; - if end > fetch_bytes.len() { + ))); + } + + let need = (range.end - range.start) as usize; + + // Fast path: the original range fits entirely within one + // fetch chunk — zero-copy slice. + let fr = &fetch_ranges[first]; + if range.end <= fr.end { + let start = (range.start - fr.start) as usize; + let end = (range.end - fr.start) as usize; + return Ok(fetched[first].slice(start..end)); + } + + // Slow path: the original range spans multiple fetch + // chunks — copy pieces into a new buffer (mirrors Java's + // copyMultiBytesToBytes). + let mut buf = Vec::with_capacity(need); + let mut pos = range.start; + for i in first..fetch_ranges.len() { + if pos >= range.end { + break; + } + let fr = &fetch_ranges[i]; + let chunk = &fetched[i]; + let src_start = (pos - fr.start) as usize; + let src_end = ((range.end.min(fr.end)) - fr.start) as usize; + if src_end > chunk.len() { + return Err(parquet::errors::ParquetError::General(format!( + "Fetched data too short for range {}..{}: \ + chunk {}..{} has {} bytes, need up to offset {}", + range.start, + range.end, + fr.start, + fr.end, + chunk.len(), + src_end, + ))); + } + buf.extend_from_slice(&chunk[src_start..src_end]); + pos = fr.end; + } + if buf.len() != need { return Err(parquet::errors::ParquetError::General(format!( - "Fetched data too short for range {}..{}: \ - expected at least {} bytes from fetch range {}..{}, got {}", + "Assembled {} bytes for range {}..{}, expected {}", + buf.len(), range.start, range.end, - end, - fetch_range.start, - fetch_range.end, - fetch_bytes.len() + need, ))); } - Ok(fetch_bytes.slice(start..end)) + Ok(Bytes::from(buf)) }) .collect(); result @@ -956,62 +993,47 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { merged } -/// Split merged ranges to utilize concurrency by repeatedly bisecting the -/// largest range at the nearest original-range boundary. This guarantees -/// every original range stays fully inside one fetch range. +/// Split merged ranges into fixed-size batches to utilize concurrency, +/// Each merged range is divided into chunks of `expected_size`, +/// with the last chunk taking whatever remains. +/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to +/// avoid excessive small IO requests. fn split_ranges_for_concurrency( merged: Vec>, - original: &[Range], + _original: &[Range], target_count: usize, ) -> Vec> { - if merged.is_empty() || target_count <= 1 || merged.len() >= target_count { + if merged.is_empty() || target_count <= 1 { return merged; } - // Collect all original-range start points as candidate split boundaries. - let mut boundaries: Vec = original.iter().map(|r| r.start).collect(); - boundaries.sort_unstable(); - boundaries.dedup(); - - let mut result = merged; - - while result.len() < target_count { - // Pick the largest range. - let (idx, largest) = result - .iter() - .enumerate() - .max_by_key(|(_, r)| r.end - r.start) - .unwrap(); + let mut result = Vec::with_capacity(merged.len()); - let largest_size = largest.end - largest.start; + for range in &merged { + let length = range.end - range.start; - // Don't split if the range is smaller than 2 * MIN_SPLIT_SIZE, - // because both halves would end up below the batch threshold. - if largest_size < MIN_SPLIT_SIZE * 2 { - break; + if length < MIN_SPLIT_SIZE * 2 { + result.push(range.clone()); + continue; } - let range = &result[idx]; - // Each half must be at least MIN_SPLIT_SIZE. - let expected_size = MIN_SPLIT_SIZE.max(largest_size / target_count as u64 + 1); - let mid = range.start + (range.end - range.start) / 2; + let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1); + let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2); - let best = boundaries - .iter() - .copied() - .filter(|&b| { - b >= range.start + expected_size && b <= range.end.saturating_sub(expected_size) - }) - .min_by_key(|&b| (b as i64 - mid as i64).unsigned_abs()); + let mut offset = range.start; + let end = range.end; - let Some(split_at) = best else { - break; // No valid split point that keeps both halves large enough. - }; - - let left = range.start..split_at; - let right = split_at..range.end; - result[idx] = left; - result.insert(idx + 1, right); + loop { + if offset + min_remain > end { + if offset < end { + result.push(offset..end); + } + break; + } else { + result.push(offset..offset + expected_size); + offset += expected_size; + } + } } result @@ -1112,8 +1134,8 @@ mod tests { // ----------------------------------------------------------------------- #[test] - fn test_split_single_range() { - // One merged range from a single original — no boundary to split at. + fn test_split_single_small_range() { + // A single range smaller than 2 * MIN_SPLIT_SIZE should not be split. #[allow(clippy::single_range_in_vec_init)] let merged = vec![0..1000]; #[allow(clippy::single_range_in_vec_init)] @@ -1123,21 +1145,6 @@ mod tests { assert_eq!(result[0], 0..1000); } - #[test] - fn test_split_mixed_sizes() { - let original = vec![0..300, 400..700, 800..1000, 2000..2010]; - let merged = vec![0..1000, 2000..2010]; - let result = super::split_ranges_for_concurrency(merged, &original, 4); - assert!(result.contains(&(2000..2010))); - for orig in &original { - assert!( - result - .iter() - .any(|r| r.start <= orig.start && r.end >= orig.end), - "original {orig:?} not fully contained" - ); - } - } #[test] fn test_split_empty() { @@ -1146,21 +1153,4 @@ mod tests { let result = super::split_ranges_for_concurrency(merged, &original, 4); assert!(result.is_empty()); } - - #[test] - fn test_split_clustered_and_sparse() { - let original = vec![0..100, 150..250, 300..400, 1000..1010, 2000..2010]; - let merged = vec![0..400, 1000..1010, 2000..2010]; - let result = super::split_ranges_for_concurrency(merged, &original, 5); - assert!(result.contains(&(1000..1010))); - assert!(result.contains(&(2000..2010))); - for orig in &original { - assert!( - result - .iter() - .any(|r| r.start <= orig.start && r.end >= orig.end), - "original {orig:?} not fully contained" - ); - } - } } From cc2032fd1d7a3eab64cd6a86f4409bb79125fa46 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 17:08:08 +0800 Subject: [PATCH 16/24] fixSplit --- crates/paimon/src/arrow/format/parquet.rs | 27 +++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index c189f4be..804e0cec 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -837,7 +837,7 @@ impl AsyncFileReader for ArrowFileReader { let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); // Phase 2: Split large merged ranges to utilize concurrency, // but only at original range boundaries. - let fetch_ranges = split_ranges_for_concurrency(coalesced, &ranges, concurrency); + let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); // Fetch merged ranges concurrently. let r = &self.r; @@ -863,8 +863,7 @@ impl AsyncFileReader for ArrowFileReader { // Slice the fetched data back into the originally requested // ranges. A single original range may span multiple fetch - // chunks (the Java `copyMultiBytesToBytes` approach), so we - // copy from as many chunks as needed. + // chunks, so we copy from as many chunks as needed. let result: parquet::errors::Result> = ranges .iter() .map(|range| { @@ -1000,7 +999,6 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { /// avoid excessive small IO requests. fn split_ranges_for_concurrency( merged: Vec>, - _original: &[Range], target_count: usize, ) -> Vec> { if merged.is_empty() || target_count <= 1 { @@ -1138,19 +1136,30 @@ mod tests { // A single range smaller than 2 * MIN_SPLIT_SIZE should not be split. #[allow(clippy::single_range_in_vec_init)] let merged = vec![0..1000]; - #[allow(clippy::single_range_in_vec_init)] - let original = vec![0..1000]; - let result = super::split_ranges_for_concurrency(merged, &original, 4); + let result = super::split_ranges_for_concurrency(merged, 4); assert_eq!(result.len(), 1); assert_eq!(result[0], 0..1000); } + #[test] + fn test_split_large_range_into_batches() { + let mb = 1024 * 1024u64; + let size = 40 * mb; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![0..size]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert!(result.len() > 1); + assert_eq!(result.first().unwrap().start, 0); + assert_eq!(result.last().unwrap().end, size); + for i in 1..result.len() { + assert_eq!(result[i].start, result[i - 1].end); + } + } #[test] fn test_split_empty() { let merged: Vec> = vec![]; - let original: Vec> = vec![]; - let result = super::split_ranges_for_concurrency(merged, &original, 4); + let result = super::split_ranges_for_concurrency(merged, 4); assert!(result.is_empty()); } } From 3bc6a4e6b62ebaf701cf9c21f2c0433c404c8db3 Mon Sep 17 00:00:00 2001 From: umi Date: Thu, 9 Apr 2026 17:22:00 +0800 Subject: [PATCH 17/24] fmt --- crates/paimon/src/arrow/format/parquet.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 804e0cec..cf470b5f 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -993,14 +993,11 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { } /// Split merged ranges into fixed-size batches to utilize concurrency, -/// Each merged range is divided into chunks of `expected_size`, -/// with the last chunk taking whatever remains. +/// Each merged range is divided into chunks of `expected_size`, +/// with the last chunk taking whatever remains. /// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to /// avoid excessive small IO requests. -fn split_ranges_for_concurrency( - merged: Vec>, - target_count: usize, -) -> Vec> { +fn split_ranges_for_concurrency(merged: Vec>, target_count: usize) -> Vec> { if merged.is_empty() || target_count <= 1 { return merged; } @@ -1009,12 +1006,6 @@ fn split_ranges_for_concurrency( for range in &merged { let length = range.end - range.start; - - if length < MIN_SPLIT_SIZE * 2 { - result.push(range.clone()); - continue; - } - let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1); let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2); From ebcbd1997a95236aeb4290c9aa6f4ef7b0a4ebf4 Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 16:15:29 +0800 Subject: [PATCH 18/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 116 ++++++++++++++---- .../src/catalog/rest/rest_token_file_io.rs | 10 +- 2 files changed, 102 insertions(+), 24 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index cf470b5f..debf4094 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -997,8 +997,8 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { /// with the last chunk taking whatever remains. /// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to /// avoid excessive small IO requests. -fn split_ranges_for_concurrency(merged: Vec>, target_count: usize) -> Vec> { - if merged.is_empty() || target_count <= 1 { +fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { + if merged.is_empty() || concurrency <= 1 { return merged; } @@ -1006,17 +1006,31 @@ fn split_ranges_for_concurrency(merged: Vec>, target_count: usize) -> for range in &merged { let length = range.end - range.start; - let expected_size = MIN_SPLIT_SIZE.max(length / target_count as u64 + 1); - let min_remain = expected_size.max(MIN_SPLIT_SIZE * 2); + let raw_size = MIN_SPLIT_SIZE.max(length / concurrency as u64 + 1); + // Round up to the nearest multiple of MIN_SPLIT_SIZE (4 MB) so that + // every split boundary is 4 MB-aligned relative to the range start. + let expected_size = raw_size.div_ceil(MIN_SPLIT_SIZE) * MIN_SPLIT_SIZE; + let min_tail_size = expected_size.max(MIN_SPLIT_SIZE * 2); let mut offset = range.start; let end = range.end; + // Align the first split boundary: if `offset` is not 4 MB-aligned, + // emit a short head chunk so that all subsequent chunks start on a + // 4 MB boundary. + let misalign = offset % MIN_SPLIT_SIZE; + if misalign != 0 { + let first_end = (offset - misalign + MIN_SPLIT_SIZE).min(end); + result.push(offset..first_end); + offset = first_end; + } + loop { - if offset + min_remain > end { - if offset < end { - result.push(offset..end); - } + if offset >= end { + break; + } + if end - offset < min_tail_size { + result.push(offset..end); break; } else { result.push(offset..offset + expected_size); @@ -1123,28 +1137,86 @@ mod tests { // ----------------------------------------------------------------------- #[test] - fn test_split_single_small_range() { - // A single range smaller than 2 * MIN_SPLIT_SIZE should not be split. + fn test_split_aligned_range_0_to_20mb() { + // 0..20MB, concurrency=4: + // raw_size = max(4MB, 5MB+1) = 5MB+1 + // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB + // min_tail_size = max(8MB, 8MB) = 8MB + // No misalign. Chunks: [0..8, 8..16, 16..20] + let mb = 1024 * 1024u64; #[allow(clippy::single_range_in_vec_init)] - let merged = vec![0..1000]; + let merged = vec![0..20 * mb]; let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!(result.len(), 1); - assert_eq!(result[0], 0..1000); + assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); } #[test] - fn test_split_large_range_into_batches() { + fn test_split_unaligned_start_6_to_14mb() { + // 6MB..14MB, concurrency=4: + // raw_size = max(4MB, 2MB+1) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. + // Result: [6..8, 8..14] let mb = 1024 * 1024u64; - let size = 40 * mb; #[allow(clippy::single_range_in_vec_init)] - let merged = vec![0..size]; + let merged = vec![6 * mb..14 * mb]; let result = super::split_ranges_for_concurrency(merged, 4); - assert!(result.len() > 1); - assert_eq!(result.first().unwrap().start, 0); - assert_eq!(result.last().unwrap().end, size); - for i in 1..result.len() { - assert_eq!(result[i].start, result[i - 1].end); - } + assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); + } + + #[test] + fn test_split_unaligned_start_6_to_22mb() { + // 6MB..22MB, concurrency=4: + // raw_size = max(4MB, 4MB+1) = 4MB+1 + // expected_size = ceil((4MB+1)/4MB)*4MB = 8MB + // min_tail_size = 8MB + // Head: 6..8MB. Loop: 8+8=16 ≤ 22 → 8..16; 16+8=24 > 22 → tail 16..22. + // Result: [6..8, 8..16, 16..22] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![6 * mb..22 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![6 * mb..8 * mb, 8 * mb..16 * mb, 16 * mb..22 * mb] + ); + } + + #[test] + fn test_split_already_aligned_8_to_24mb() { + // 8MB..24MB, concurrency=4: + // raw_size = max(4MB, 4MB+1) = 4MB+1 + // expected_size = 8MB, min_tail_size = 8MB + // No misalign. Loop: 8+8=16 ≤ 24 → 8..16; 16+8=24 ≤ 24 → 16..24; offset=24 >= end → break. + // Result: [8..16, 16..24] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![8 * mb..24 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result, vec![8 * mb..16 * mb, 16 * mb..24 * mb]); + } + + #[test] + fn test_split_multiple_ranges() { + // [0..20MB, 24..44MB], concurrency=4: + // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) + // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. + // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. + // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] + let mb = 1024 * 1024u64; + let merged = vec![0..20 * mb, 24 * mb..44 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 0..8 * mb, + 8 * mb..16 * mb, + 16 * mb..20 * mb, + 24 * mb..32 * mb, + 32 * mb..40 * mb, + 40 * mb..44 * mb, + ] + ); } #[test] diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 6233eb10..9e7a5a5d 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -93,8 +93,14 @@ impl RESTTokenFileIO { match token_guard.as_ref() { Some(token) => { // Merge catalog options (base) with token credentials (override) - let merged_props = - RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token)); + // token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com + let mut token_with_endpoint = token.token.clone(); + token_with_endpoint.insert( + "fs.oss.endpoint".to_string(), + "oss-cn-hangzhou.aliyuncs.com".to_string(), + ); + let base = self.catalog_options.to_map().clone(); + let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint)); // Build FileIO with merged properties let mut builder = FileIO::from_path(&self.path)?; builder = builder.with_props(merged_props); From 3176aa641a3dbc10c939200e58e9cab3c5480809 Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 16:38:14 +0800 Subject: [PATCH 19/24] rename --- crates/paimon/src/arrow/format/parquet.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index debf4094..6ca4130d 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -792,9 +792,10 @@ const RANGE_FETCH_CONCURRENCY: usize = 10; /// metadata prefetch hint: 512 KiB. const METADATA_SIZE_HINT: usize = 512 * 1024; /// Minimum range size for splitting: 4 MiB. -/// Ranges smaller than this will not be split further to avoid -/// excessive small IO requests whose per-request overhead dominates. -const MIN_SPLIT_SIZE: u64 = 4 * 1024 * 1024; +/// The block size used for split alignment and as the minimum split +/// granularity. Ranges smaller than this will not be split further to +/// avoid excessive small IO requests whose per-request overhead dominates. +const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { @@ -995,7 +996,7 @@ fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { /// Split merged ranges into fixed-size batches to utilize concurrency, /// Each merged range is divided into chunks of `expected_size`, /// with the last chunk taking whatever remains. -/// Ranges smaller than `2 * MIN_SPLIT_SIZE` are kept as-is to +/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to /// avoid excessive small IO requests. fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { if merged.is_empty() || concurrency <= 1 { @@ -1006,11 +1007,11 @@ fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> for range in &merged { let length = range.end - range.start; - let raw_size = MIN_SPLIT_SIZE.max(length / concurrency as u64 + 1); - // Round up to the nearest multiple of MIN_SPLIT_SIZE (4 MB) so that + let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); + // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that // every split boundary is 4 MB-aligned relative to the range start. - let expected_size = raw_size.div_ceil(MIN_SPLIT_SIZE) * MIN_SPLIT_SIZE; - let min_tail_size = expected_size.max(MIN_SPLIT_SIZE * 2); + let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; + let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); let mut offset = range.start; let end = range.end; @@ -1018,9 +1019,9 @@ fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> // Align the first split boundary: if `offset` is not 4 MB-aligned, // emit a short head chunk so that all subsequent chunks start on a // 4 MB boundary. - let misalign = offset % MIN_SPLIT_SIZE; + let misalign = offset % IO_BLOCK_SIZE; if misalign != 0 { - let first_end = (offset - misalign + MIN_SPLIT_SIZE).min(end); + let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); result.push(offset..first_end); offset = first_end; } From 94b048db0350b33e77f94f4bbf383ec1c3562f6d Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 16:44:22 +0800 Subject: [PATCH 20/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 37 ++++++++++++++++------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 6ca4130d..b0aa0ec2 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -1168,33 +1168,48 @@ mod tests { #[test] fn test_split_unaligned_start_6_to_22mb() { // 6MB..22MB, concurrency=4: - // raw_size = max(4MB, 4MB+1) = 4MB+1 - // expected_size = ceil((4MB+1)/4MB)*4MB = 8MB - // min_tail_size = 8MB - // Head: 6..8MB. Loop: 8+8=16 ≤ 22 → 8..16; 16+8=24 > 22 → tail 16..22. - // Result: [6..8, 8..16, 16..22] + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = ceil(4MB/4MB)*4MB = 4MB + // min_tail_size = max(4MB, 8MB) = 8MB + // Head: 6..8MB (misalign=2MB). + // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. + // Result: [6..8, 8..12, 12..16, 16..22] let mb = 1024 * 1024u64; #[allow(clippy::single_range_in_vec_init)] let merged = vec![6 * mb..22 * mb]; let result = super::split_ranges_for_concurrency(merged, 4); assert_eq!( result, - vec![6 * mb..8 * mb, 8 * mb..16 * mb, 16 * mb..22 * mb] + vec![ + 6 * mb..8 * mb, + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..22 * mb, + ] ); } #[test] fn test_split_already_aligned_8_to_24mb() { // 8MB..24MB, concurrency=4: - // raw_size = max(4MB, 4MB+1) = 4MB+1 - // expected_size = 8MB, min_tail_size = 8MB - // No misalign. Loop: 8+8=16 ≤ 24 → 8..16; 16+8=24 ≤ 24 → 16..24; offset=24 >= end → break. - // Result: [8..16, 16..24] + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // No misalign. + // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. + // Result: [8..12, 12..16, 16..20, 20..24] let mb = 1024 * 1024u64; #[allow(clippy::single_range_in_vec_init)] let merged = vec![8 * mb..24 * mb]; let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!(result, vec![8 * mb..16 * mb, 16 * mb..24 * mb]); + assert_eq!( + result, + vec![ + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..20 * mb, + 20 * mb..24 * mb, + ] + ); } #[test] From 326ea852a2ffc98c9b4acbef115f7b8560def95f Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 16:48:16 +0800 Subject: [PATCH 21/24] rm --- crates/paimon/src/catalog/rest/rest_token_file_io.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/paimon/src/catalog/rest/rest_token_file_io.rs b/crates/paimon/src/catalog/rest/rest_token_file_io.rs index 9e7a5a5d..6233eb10 100644 --- a/crates/paimon/src/catalog/rest/rest_token_file_io.rs +++ b/crates/paimon/src/catalog/rest/rest_token_file_io.rs @@ -93,14 +93,8 @@ impl RESTTokenFileIO { match token_guard.as_ref() { Some(token) => { // Merge catalog options (base) with token credentials (override) - // token.token["fs.oss.endpoint"] = oss-cn-hangzhou.aliyuncs.com - let mut token_with_endpoint = token.token.clone(); - token_with_endpoint.insert( - "fs.oss.endpoint".to_string(), - "oss-cn-hangzhou.aliyuncs.com".to_string(), - ); - let base = self.catalog_options.to_map().clone(); - let merged_props = RESTUtil::merge(Some(&base), Some(&token_with_endpoint)); + let merged_props = + RESTUtil::merge(Some(self.catalog_options.to_map()), Some(&token.token)); // Build FileIO with merged properties let mut builder = FileIO::from_path(&self.path)?; builder = builder.with_props(merged_props); From a222e2bf02249525b452061bcd6371e64a739ef5 Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 17:16:13 +0800 Subject: [PATCH 22/24] rm --- crates/paimon/src/arrow/format/parquet.rs | 682 +++++++++++----------- 1 file changed, 341 insertions(+), 341 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index b0aa0ec2..2863357a 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -785,17 +785,17 @@ struct ArrowFileReader { r: Box, } -/// coalesce threshold: 1 MiB. -const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; -/// concurrent range fetches. -const RANGE_FETCH_CONCURRENCY: usize = 10; -/// metadata prefetch hint: 512 KiB. -const METADATA_SIZE_HINT: usize = 512 * 1024; -/// Minimum range size for splitting: 4 MiB. -/// The block size used for split alignment and as the minimum split -/// granularity. Ranges smaller than this will not be split further to -/// avoid excessive small IO requests whose per-request overhead dominates. -const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; +// /// coalesce threshold: 1 MiB. +// const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +// /// concurrent range fetches. +// const RANGE_FETCH_CONCURRENCY: usize = 10; +// /// metadata prefetch hint: 512 KiB. +// const METADATA_SIZE_HINT: usize = 512 * 1024; +// /// Minimum range size for splitting: 4 MiB. +// /// The block size used for split alignment and as the minimum split +// /// granularity. Ranges smaller than this will not be split further to +// /// avoid excessive small IO requests whose per-request overhead dominates. +// const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { @@ -821,128 +821,128 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - let coalesce_bytes = RANGE_COALESCE_BYTES; - let concurrency = RANGE_FETCH_CONCURRENCY; - - async move { - if ranges.is_empty() { - return Ok(vec![]); - } - - // Two-phase range optimization: - // Phase 1: Merge nearby ranges based on coalesce threshold. - let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); - // Phase 2: Split large merged ranges to utilize concurrency, - // but only at original range boundaries. - let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); - - // Fetch merged ranges concurrently. - let r = &self.r; - let fetched: Vec = if fetch_ranges.len() <= concurrency { - // All ranges fit within the concurrency limit — fire them all at once. - futures::future::try_join_all(fetch_ranges.iter().map(|range| { - r.read(range.clone()) - .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) - })) - .await? - } else { - // More ranges than concurrency slots — use buffered stream. - futures::stream::iter(fetch_ranges.iter().cloned()) - .map(|range| async move { - r.read(range).await.map_err(|e| { - parquet::errors::ParquetError::External(format!("{e}").into()) - }) - }) - .buffered(concurrency) - .try_collect() - .await? - }; - - // Slice the fetched data back into the originally requested - // ranges. A single original range may span multiple fetch - // chunks, so we copy from as many chunks as needed. - let result: parquet::errors::Result> = ranges - .iter() - .map(|range| { - // Find the first fetch chunk whose end is past range.start. - let first = fetch_ranges.partition_point(|v| v.end <= range.start); - if first >= fetch_ranges.len() { - return Err(parquet::errors::ParquetError::General(format!( - "No fetch range covers requested range {}..{}", - range.start, range.end - ))); - } - - let need = (range.end - range.start) as usize; - - // Fast path: the original range fits entirely within one - // fetch chunk — zero-copy slice. - let fr = &fetch_ranges[first]; - if range.end <= fr.end { - let start = (range.start - fr.start) as usize; - let end = (range.end - fr.start) as usize; - return Ok(fetched[first].slice(start..end)); - } - - // Slow path: the original range spans multiple fetch - // chunks — copy pieces into a new buffer (mirrors Java's - // copyMultiBytesToBytes). - let mut buf = Vec::with_capacity(need); - let mut pos = range.start; - for i in first..fetch_ranges.len() { - if pos >= range.end { - break; - } - let fr = &fetch_ranges[i]; - let chunk = &fetched[i]; - let src_start = (pos - fr.start) as usize; - let src_end = ((range.end.min(fr.end)) - fr.start) as usize; - if src_end > chunk.len() { - return Err(parquet::errors::ParquetError::General(format!( - "Fetched data too short for range {}..{}: \ - chunk {}..{} has {} bytes, need up to offset {}", - range.start, - range.end, - fr.start, - fr.end, - chunk.len(), - src_end, - ))); - } - buf.extend_from_slice(&chunk[src_start..src_end]); - pos = fr.end; - } - if buf.len() != need { - return Err(parquet::errors::ParquetError::General(format!( - "Assembled {} bytes for range {}..{}, expected {}", - buf.len(), - range.start, - range.end, - need, - ))); - } - Ok(Bytes::from(buf)) - }) - .collect(); - result - } - .boxed() - } + // fn get_byte_ranges( + // &mut self, + // ranges: Vec>, + // ) -> BoxFuture<'_, parquet::errors::Result>> { + // let coalesce_bytes = RANGE_COALESCE_BYTES; + // let concurrency = RANGE_FETCH_CONCURRENCY; + + // async move { + // if ranges.is_empty() { + // return Ok(vec![]); + // } + + // // Two-phase range optimization: + // // Phase 1: Merge nearby ranges based on coalesce threshold. + // let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); + // // Phase 2: Split large merged ranges to utilize concurrency, + // // but only at original range boundaries. + // let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); + + // // Fetch merged ranges concurrently. + // let r = &self.r; + // let fetched: Vec = if fetch_ranges.len() <= concurrency { + // // All ranges fit within the concurrency limit — fire them all at once. + // futures::future::try_join_all(fetch_ranges.iter().map(|range| { + // r.read(range.clone()) + // .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) + // })) + // .await? + // } else { + // // More ranges than concurrency slots — use buffered stream. + // futures::stream::iter(fetch_ranges.iter().cloned()) + // .map(|range| async move { + // r.read(range).await.map_err(|e| { + // parquet::errors::ParquetError::External(format!("{e}").into()) + // }) + // }) + // .buffered(concurrency) + // .try_collect() + // .await? + // }; + + // // Slice the fetched data back into the originally requested + // // ranges. A single original range may span multiple fetch + // // chunks, so we copy from as many chunks as needed. + // let result: parquet::errors::Result> = ranges + // .iter() + // .map(|range| { + // // Find the first fetch chunk whose end is past range.start. + // let first = fetch_ranges.partition_point(|v| v.end <= range.start); + // if first >= fetch_ranges.len() { + // return Err(parquet::errors::ParquetError::General(format!( + // "No fetch range covers requested range {}..{}", + // range.start, range.end + // ))); + // } + + // let need = (range.end - range.start) as usize; + + // // Fast path: the original range fits entirely within one + // // fetch chunk — zero-copy slice. + // let fr = &fetch_ranges[first]; + // if range.end <= fr.end { + // let start = (range.start - fr.start) as usize; + // let end = (range.end - fr.start) as usize; + // return Ok(fetched[first].slice(start..end)); + // } + + // // Slow path: the original range spans multiple fetch + // // chunks — copy pieces into a new buffer (mirrors Java's + // // copyMultiBytesToBytes). + // let mut buf = Vec::with_capacity(need); + // let mut pos = range.start; + // for i in first..fetch_ranges.len() { + // if pos >= range.end { + // break; + // } + // let fr = &fetch_ranges[i]; + // let chunk = &fetched[i]; + // let src_start = (pos - fr.start) as usize; + // let src_end = ((range.end.min(fr.end)) - fr.start) as usize; + // if src_end > chunk.len() { + // return Err(parquet::errors::ParquetError::General(format!( + // "Fetched data too short for range {}..{}: \ + // chunk {}..{} has {} bytes, need up to offset {}", + // range.start, + // range.end, + // fr.start, + // fr.end, + // chunk.len(), + // src_end, + // ))); + // } + // buf.extend_from_slice(&chunk[src_start..src_end]); + // pos = fr.end; + // } + // if buf.len() != need { + // return Err(parquet::errors::ParquetError::General(format!( + // "Assembled {} bytes for range {}..{}, expected {}", + // buf.len(), + // range.start, + // range.end, + // need, + // ))); + // } + // Ok(Bytes::from(buf)) + // }) + // .collect(); + // result + // } + // .boxed() + // } fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); - let prefetch_hint = Some(METADATA_SIZE_HINT); + // let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() - .with_prefetch_hint(prefetch_hint) + // .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; @@ -959,89 +959,89 @@ impl AsyncFileReader for ArrowFileReader { /// /// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. /// The input does not need to be sorted. -fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { - if ranges.is_empty() { - return vec![]; - } - - let mut sorted = ranges.to_vec(); - sorted.sort_unstable_by_key(|r| r.start); - - let mut merged = Vec::with_capacity(sorted.len()); - let mut start_idx = 0; - let mut end_idx = 1; - - while start_idx != sorted.len() { - let mut range_end = sorted[start_idx].end; - - while end_idx != sorted.len() - && sorted[end_idx] - .start - .checked_sub(range_end) - .map(|delta| delta <= coalesce) - .unwrap_or(true) - { - range_end = range_end.max(sorted[end_idx].end); - end_idx += 1; - } - - merged.push(sorted[start_idx].start..range_end); - start_idx = end_idx; - end_idx += 1; - } - - merged -} +// fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { +// if ranges.is_empty() { +// return vec![]; +// } + +// let mut sorted = ranges.to_vec(); +// sorted.sort_unstable_by_key(|r| r.start); + +// let mut merged = Vec::with_capacity(sorted.len()); +// let mut start_idx = 0; +// let mut end_idx = 1; + +// while start_idx != sorted.len() { +// let mut range_end = sorted[start_idx].end; + +// while end_idx != sorted.len() +// && sorted[end_idx] +// .start +// .checked_sub(range_end) +// .map(|delta| delta <= coalesce) +// .unwrap_or(true) +// { +// range_end = range_end.max(sorted[end_idx].end); +// end_idx += 1; +// } + +// merged.push(sorted[start_idx].start..range_end); +// start_idx = end_idx; +// end_idx += 1; +// } + +// merged +// } /// Split merged ranges into fixed-size batches to utilize concurrency, /// Each merged range is divided into chunks of `expected_size`, /// with the last chunk taking whatever remains. /// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to /// avoid excessive small IO requests. -fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { - if merged.is_empty() || concurrency <= 1 { - return merged; - } - - let mut result = Vec::with_capacity(merged.len()); - - for range in &merged { - let length = range.end - range.start; - let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); - // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that - // every split boundary is 4 MB-aligned relative to the range start. - let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; - let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); - - let mut offset = range.start; - let end = range.end; - - // Align the first split boundary: if `offset` is not 4 MB-aligned, - // emit a short head chunk so that all subsequent chunks start on a - // 4 MB boundary. - let misalign = offset % IO_BLOCK_SIZE; - if misalign != 0 { - let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); - result.push(offset..first_end); - offset = first_end; - } - - loop { - if offset >= end { - break; - } - if end - offset < min_tail_size { - result.push(offset..end); - break; - } else { - result.push(offset..offset + expected_size); - offset += expected_size; - } - } - } - - result -} +// fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { +// if merged.is_empty() || concurrency <= 1 { +// return merged; +// } + +// let mut result = Vec::with_capacity(merged.len()); + +// for range in &merged { +// let length = range.end - range.start; +// let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); +// // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that +// // every split boundary is 4 MB-aligned relative to the range start. +// let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; +// let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); + +// let mut offset = range.start; +// let end = range.end; + +// // Align the first split boundary: if `offset` is not 4 MB-aligned, +// // emit a short head chunk so that all subsequent chunks start on a +// // 4 MB boundary. +// let misalign = offset % IO_BLOCK_SIZE; +// if misalign != 0 { +// let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); +// result.push(offset..first_end); +// offset = first_end; +// } + +// loop { +// if offset >= end { +// break; +// } +// if end - offset < min_tail_size { +// result.push(offset..end); +// break; +// } else { +// result.push(offset..offset + expected_size); +// offset += expected_size; +// } +// } +// } + +// result +// } // --------------------------------------------------------------------------- // Tests @@ -1101,144 +1101,144 @@ mod tests { // merge_byte_ranges tests // ----------------------------------------------------------------------- - #[test] - fn test_merge_byte_ranges_empty() { - assert_eq!( - super::merge_byte_ranges(&[], 1024), - Vec::>::new() - ); - } - - #[test] - fn test_merge_byte_ranges_no_coalesce() { - // Ranges far apart should not be merged - let ranges = vec![0..100, 1_000_000..1_000_100]; - let merged = super::merge_byte_ranges(&ranges, 1024); - assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); - } - - #[test] - fn test_merge_byte_ranges_coalesce() { - // Ranges within the gap threshold should be merged - let ranges = vec![0..100, 200..300, 500..600]; - let merged = super::merge_byte_ranges(&ranges, 1024); - assert_eq!(merged, vec![0..600]); - } - - #[test] - fn test_merge_byte_ranges_zero_coalesce_gap() { - // With coalesce=0, ranges with a 1-byte gap should NOT merge - let ranges = vec![0..100, 101..200]; - let merged = super::merge_byte_ranges(&ranges, 0); - assert_eq!(merged, vec![0..100, 101..200]); - } - - // ----------------------------------------------------------------------- - // split_ranges_for_concurrency tests - // ----------------------------------------------------------------------- - - #[test] - fn test_split_aligned_range_0_to_20mb() { - // 0..20MB, concurrency=4: - // raw_size = max(4MB, 5MB+1) = 5MB+1 - // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB - // min_tail_size = max(8MB, 8MB) = 8MB - // No misalign. Chunks: [0..8, 8..16, 16..20] - let mb = 1024 * 1024u64; - #[allow(clippy::single_range_in_vec_init)] - let merged = vec![0..20 * mb]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); - } - - #[test] - fn test_split_unaligned_start_6_to_14mb() { - // 6MB..14MB, concurrency=4: - // raw_size = max(4MB, 2MB+1) = 4MB - // expected_size = 4MB, min_tail_size = 8MB - // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. - // Result: [6..8, 8..14] - let mb = 1024 * 1024u64; - #[allow(clippy::single_range_in_vec_init)] - let merged = vec![6 * mb..14 * mb]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); - } - - #[test] - fn test_split_unaligned_start_6_to_22mb() { - // 6MB..22MB, concurrency=4: - // raw_size = max(4MB, ceil(16MB/4)) = 4MB - // expected_size = ceil(4MB/4MB)*4MB = 4MB - // min_tail_size = max(4MB, 8MB) = 8MB - // Head: 6..8MB (misalign=2MB). - // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. - // Result: [6..8, 8..12, 12..16, 16..22] - let mb = 1024 * 1024u64; - #[allow(clippy::single_range_in_vec_init)] - let merged = vec![6 * mb..22 * mb]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!( - result, - vec![ - 6 * mb..8 * mb, - 8 * mb..12 * mb, - 12 * mb..16 * mb, - 16 * mb..22 * mb, - ] - ); - } - - #[test] - fn test_split_already_aligned_8_to_24mb() { - // 8MB..24MB, concurrency=4: - // raw_size = max(4MB, ceil(16MB/4)) = 4MB - // expected_size = 4MB, min_tail_size = 8MB - // No misalign. - // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. - // Result: [8..12, 12..16, 16..20, 20..24] - let mb = 1024 * 1024u64; - #[allow(clippy::single_range_in_vec_init)] - let merged = vec![8 * mb..24 * mb]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!( - result, - vec![ - 8 * mb..12 * mb, - 12 * mb..16 * mb, - 16 * mb..20 * mb, - 20 * mb..24 * mb, - ] - ); - } - - #[test] - fn test_split_multiple_ranges() { - // [0..20MB, 24..44MB], concurrency=4: - // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) - // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. - // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. - // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] - let mb = 1024 * 1024u64; - let merged = vec![0..20 * mb, 24 * mb..44 * mb]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert_eq!( - result, - vec![ - 0..8 * mb, - 8 * mb..16 * mb, - 16 * mb..20 * mb, - 24 * mb..32 * mb, - 32 * mb..40 * mb, - 40 * mb..44 * mb, - ] - ); - } - - #[test] - fn test_split_empty() { - let merged: Vec> = vec![]; - let result = super::split_ranges_for_concurrency(merged, 4); - assert!(result.is_empty()); - } + // #[test] + // fn test_merge_byte_ranges_empty() { + // assert_eq!( + // super::merge_byte_ranges(&[], 1024), + // Vec::>::new() + // ); + // } + + // #[test] + // fn test_merge_byte_ranges_no_coalesce() { + // // Ranges far apart should not be merged + // let ranges = vec![0..100, 1_000_000..1_000_100]; + // let merged = super::merge_byte_ranges(&ranges, 1024); + // assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + // } + + // #[test] + // fn test_merge_byte_ranges_coalesce() { + // // Ranges within the gap threshold should be merged + // let ranges = vec![0..100, 200..300, 500..600]; + // let merged = super::merge_byte_ranges(&ranges, 1024); + // assert_eq!(merged, vec![0..600]); + // } + + // #[test] + // fn test_merge_byte_ranges_zero_coalesce_gap() { + // // With coalesce=0, ranges with a 1-byte gap should NOT merge + // let ranges = vec![0..100, 101..200]; + // let merged = super::merge_byte_ranges(&ranges, 0); + // assert_eq!(merged, vec![0..100, 101..200]); + // } + + // // ----------------------------------------------------------------------- + // // split_ranges_for_concurrency tests + // // ----------------------------------------------------------------------- + + // #[test] + // fn test_split_aligned_range_0_to_20mb() { + // // 0..20MB, concurrency=4: + // // raw_size = max(4MB, 5MB+1) = 5MB+1 + // // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB + // // min_tail_size = max(8MB, 8MB) = 8MB + // // No misalign. Chunks: [0..8, 8..16, 16..20] + // let mb = 1024 * 1024u64; + // #[allow(clippy::single_range_in_vec_init)] + // let merged = vec![0..20 * mb]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); + // } + + // #[test] + // fn test_split_unaligned_start_6_to_14mb() { + // // 6MB..14MB, concurrency=4: + // // raw_size = max(4MB, 2MB+1) = 4MB + // // expected_size = 4MB, min_tail_size = 8MB + // // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. + // // Result: [6..8, 8..14] + // let mb = 1024 * 1024u64; + // #[allow(clippy::single_range_in_vec_init)] + // let merged = vec![6 * mb..14 * mb]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); + // } + + // #[test] + // fn test_split_unaligned_start_6_to_22mb() { + // // 6MB..22MB, concurrency=4: + // // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // // expected_size = ceil(4MB/4MB)*4MB = 4MB + // // min_tail_size = max(4MB, 8MB) = 8MB + // // Head: 6..8MB (misalign=2MB). + // // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. + // // Result: [6..8, 8..12, 12..16, 16..22] + // let mb = 1024 * 1024u64; + // #[allow(clippy::single_range_in_vec_init)] + // let merged = vec![6 * mb..22 * mb]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert_eq!( + // result, + // vec![ + // 6 * mb..8 * mb, + // 8 * mb..12 * mb, + // 12 * mb..16 * mb, + // 16 * mb..22 * mb, + // ] + // ); + // } + + // #[test] + // fn test_split_already_aligned_8_to_24mb() { + // // 8MB..24MB, concurrency=4: + // // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // // expected_size = 4MB, min_tail_size = 8MB + // // No misalign. + // // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. + // // Result: [8..12, 12..16, 16..20, 20..24] + // let mb = 1024 * 1024u64; + // #[allow(clippy::single_range_in_vec_init)] + // let merged = vec![8 * mb..24 * mb]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert_eq!( + // result, + // vec![ + // 8 * mb..12 * mb, + // 12 * mb..16 * mb, + // 16 * mb..20 * mb, + // 20 * mb..24 * mb, + // ] + // ); + // } + + // #[test] + // fn test_split_multiple_ranges() { + // // [0..20MB, 24..44MB], concurrency=4: + // // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) + // // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. + // // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. + // // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] + // let mb = 1024 * 1024u64; + // let merged = vec![0..20 * mb, 24 * mb..44 * mb]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert_eq!( + // result, + // vec![ + // 0..8 * mb, + // 8 * mb..16 * mb, + // 16 * mb..20 * mb, + // 24 * mb..32 * mb, + // 32 * mb..40 * mb, + // 40 * mb..44 * mb, + // ] + // ); + // } + + // #[test] + // fn test_split_empty() { + // let merged: Vec> = vec![]; + // let result = super::split_ranges_for_concurrency(merged, 4); + // assert!(result.is_empty()); + // } } From 0c85ef90f668aaf614494f7f29d366893602216c Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 17:20:04 +0800 Subject: [PATCH 23/24] test --- crates/paimon/src/arrow/format/parquet.rs | 365 +--------------------- 1 file changed, 1 insertion(+), 364 deletions(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 2863357a..915bd565 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -785,18 +785,6 @@ struct ArrowFileReader { r: Box, } -// /// coalesce threshold: 1 MiB. -// const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; -// /// concurrent range fetches. -// const RANGE_FETCH_CONCURRENCY: usize = 10; -// /// metadata prefetch hint: 512 KiB. -// const METADATA_SIZE_HINT: usize = 512 * 1024; -// /// Minimum range size for splitting: 4 MiB. -// /// The block size used for split alignment and as the minimum split -// /// granularity. Ranges smaller than this will not be split further to -// /// avoid excessive small IO requests whose per-request overhead dominates. -// const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; - impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { Self { file_size, r } @@ -821,128 +809,14 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } - // fn get_byte_ranges( - // &mut self, - // ranges: Vec>, - // ) -> BoxFuture<'_, parquet::errors::Result>> { - // let coalesce_bytes = RANGE_COALESCE_BYTES; - // let concurrency = RANGE_FETCH_CONCURRENCY; - - // async move { - // if ranges.is_empty() { - // return Ok(vec![]); - // } - - // // Two-phase range optimization: - // // Phase 1: Merge nearby ranges based on coalesce threshold. - // let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); - // // Phase 2: Split large merged ranges to utilize concurrency, - // // but only at original range boundaries. - // let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); - - // // Fetch merged ranges concurrently. - // let r = &self.r; - // let fetched: Vec = if fetch_ranges.len() <= concurrency { - // // All ranges fit within the concurrency limit — fire them all at once. - // futures::future::try_join_all(fetch_ranges.iter().map(|range| { - // r.read(range.clone()) - // .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) - // })) - // .await? - // } else { - // // More ranges than concurrency slots — use buffered stream. - // futures::stream::iter(fetch_ranges.iter().cloned()) - // .map(|range| async move { - // r.read(range).await.map_err(|e| { - // parquet::errors::ParquetError::External(format!("{e}").into()) - // }) - // }) - // .buffered(concurrency) - // .try_collect() - // .await? - // }; - - // // Slice the fetched data back into the originally requested - // // ranges. A single original range may span multiple fetch - // // chunks, so we copy from as many chunks as needed. - // let result: parquet::errors::Result> = ranges - // .iter() - // .map(|range| { - // // Find the first fetch chunk whose end is past range.start. - // let first = fetch_ranges.partition_point(|v| v.end <= range.start); - // if first >= fetch_ranges.len() { - // return Err(parquet::errors::ParquetError::General(format!( - // "No fetch range covers requested range {}..{}", - // range.start, range.end - // ))); - // } - - // let need = (range.end - range.start) as usize; - - // // Fast path: the original range fits entirely within one - // // fetch chunk — zero-copy slice. - // let fr = &fetch_ranges[first]; - // if range.end <= fr.end { - // let start = (range.start - fr.start) as usize; - // let end = (range.end - fr.start) as usize; - // return Ok(fetched[first].slice(start..end)); - // } - - // // Slow path: the original range spans multiple fetch - // // chunks — copy pieces into a new buffer (mirrors Java's - // // copyMultiBytesToBytes). - // let mut buf = Vec::with_capacity(need); - // let mut pos = range.start; - // for i in first..fetch_ranges.len() { - // if pos >= range.end { - // break; - // } - // let fr = &fetch_ranges[i]; - // let chunk = &fetched[i]; - // let src_start = (pos - fr.start) as usize; - // let src_end = ((range.end.min(fr.end)) - fr.start) as usize; - // if src_end > chunk.len() { - // return Err(parquet::errors::ParquetError::General(format!( - // "Fetched data too short for range {}..{}: \ - // chunk {}..{} has {} bytes, need up to offset {}", - // range.start, - // range.end, - // fr.start, - // fr.end, - // chunk.len(), - // src_end, - // ))); - // } - // buf.extend_from_slice(&chunk[src_start..src_end]); - // pos = fr.end; - // } - // if buf.len() != need { - // return Err(parquet::errors::ParquetError::General(format!( - // "Assembled {} bytes for range {}..{}, expected {}", - // buf.len(), - // range.start, - // range.end, - // need, - // ))); - // } - // Ok(Bytes::from(buf)) - // }) - // .collect(); - // result - // } - // .boxed() - // } - fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); - // let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() - // .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; @@ -951,98 +825,6 @@ impl AsyncFileReader for ArrowFileReader { } } -// --------------------------------------------------------------------------- -// Range coalescing -// --------------------------------------------------------------------------- - -/// Merge nearby byte ranges to reduce the number of requests. -/// -/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. -/// The input does not need to be sorted. -// fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { -// if ranges.is_empty() { -// return vec![]; -// } - -// let mut sorted = ranges.to_vec(); -// sorted.sort_unstable_by_key(|r| r.start); - -// let mut merged = Vec::with_capacity(sorted.len()); -// let mut start_idx = 0; -// let mut end_idx = 1; - -// while start_idx != sorted.len() { -// let mut range_end = sorted[start_idx].end; - -// while end_idx != sorted.len() -// && sorted[end_idx] -// .start -// .checked_sub(range_end) -// .map(|delta| delta <= coalesce) -// .unwrap_or(true) -// { -// range_end = range_end.max(sorted[end_idx].end); -// end_idx += 1; -// } - -// merged.push(sorted[start_idx].start..range_end); -// start_idx = end_idx; -// end_idx += 1; -// } - -// merged -// } - -/// Split merged ranges into fixed-size batches to utilize concurrency, -/// Each merged range is divided into chunks of `expected_size`, -/// with the last chunk taking whatever remains. -/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to -/// avoid excessive small IO requests. -// fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { -// if merged.is_empty() || concurrency <= 1 { -// return merged; -// } - -// let mut result = Vec::with_capacity(merged.len()); - -// for range in &merged { -// let length = range.end - range.start; -// let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); -// // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that -// // every split boundary is 4 MB-aligned relative to the range start. -// let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; -// let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); - -// let mut offset = range.start; -// let end = range.end; - -// // Align the first split boundary: if `offset` is not 4 MB-aligned, -// // emit a short head chunk so that all subsequent chunks start on a -// // 4 MB boundary. -// let misalign = offset % IO_BLOCK_SIZE; -// if misalign != 0 { -// let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); -// result.push(offset..first_end); -// offset = first_end; -// } - -// loop { -// if offset >= end { -// break; -// } -// if end - offset < min_tail_size { -// result.push(offset..end); -// break; -// } else { -// result.push(offset..offset + expected_size); -// offset += expected_size; -// } -// } -// } - -// result -// } - // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -1096,149 +878,4 @@ mod tests { assert!(row_filter.is_some()); } - - // ----------------------------------------------------------------------- - // merge_byte_ranges tests - // ----------------------------------------------------------------------- - - // #[test] - // fn test_merge_byte_ranges_empty() { - // assert_eq!( - // super::merge_byte_ranges(&[], 1024), - // Vec::>::new() - // ); - // } - - // #[test] - // fn test_merge_byte_ranges_no_coalesce() { - // // Ranges far apart should not be merged - // let ranges = vec![0..100, 1_000_000..1_000_100]; - // let merged = super::merge_byte_ranges(&ranges, 1024); - // assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); - // } - - // #[test] - // fn test_merge_byte_ranges_coalesce() { - // // Ranges within the gap threshold should be merged - // let ranges = vec![0..100, 200..300, 500..600]; - // let merged = super::merge_byte_ranges(&ranges, 1024); - // assert_eq!(merged, vec![0..600]); - // } - - // #[test] - // fn test_merge_byte_ranges_zero_coalesce_gap() { - // // With coalesce=0, ranges with a 1-byte gap should NOT merge - // let ranges = vec![0..100, 101..200]; - // let merged = super::merge_byte_ranges(&ranges, 0); - // assert_eq!(merged, vec![0..100, 101..200]); - // } - - // // ----------------------------------------------------------------------- - // // split_ranges_for_concurrency tests - // // ----------------------------------------------------------------------- - - // #[test] - // fn test_split_aligned_range_0_to_20mb() { - // // 0..20MB, concurrency=4: - // // raw_size = max(4MB, 5MB+1) = 5MB+1 - // // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB - // // min_tail_size = max(8MB, 8MB) = 8MB - // // No misalign. Chunks: [0..8, 8..16, 16..20] - // let mb = 1024 * 1024u64; - // #[allow(clippy::single_range_in_vec_init)] - // let merged = vec![0..20 * mb]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); - // } - - // #[test] - // fn test_split_unaligned_start_6_to_14mb() { - // // 6MB..14MB, concurrency=4: - // // raw_size = max(4MB, 2MB+1) = 4MB - // // expected_size = 4MB, min_tail_size = 8MB - // // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. - // // Result: [6..8, 8..14] - // let mb = 1024 * 1024u64; - // #[allow(clippy::single_range_in_vec_init)] - // let merged = vec![6 * mb..14 * mb]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); - // } - - // #[test] - // fn test_split_unaligned_start_6_to_22mb() { - // // 6MB..22MB, concurrency=4: - // // raw_size = max(4MB, ceil(16MB/4)) = 4MB - // // expected_size = ceil(4MB/4MB)*4MB = 4MB - // // min_tail_size = max(4MB, 8MB) = 8MB - // // Head: 6..8MB (misalign=2MB). - // // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. - // // Result: [6..8, 8..12, 12..16, 16..22] - // let mb = 1024 * 1024u64; - // #[allow(clippy::single_range_in_vec_init)] - // let merged = vec![6 * mb..22 * mb]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert_eq!( - // result, - // vec![ - // 6 * mb..8 * mb, - // 8 * mb..12 * mb, - // 12 * mb..16 * mb, - // 16 * mb..22 * mb, - // ] - // ); - // } - - // #[test] - // fn test_split_already_aligned_8_to_24mb() { - // // 8MB..24MB, concurrency=4: - // // raw_size = max(4MB, ceil(16MB/4)) = 4MB - // // expected_size = 4MB, min_tail_size = 8MB - // // No misalign. - // // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. - // // Result: [8..12, 12..16, 16..20, 20..24] - // let mb = 1024 * 1024u64; - // #[allow(clippy::single_range_in_vec_init)] - // let merged = vec![8 * mb..24 * mb]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert_eq!( - // result, - // vec![ - // 8 * mb..12 * mb, - // 12 * mb..16 * mb, - // 16 * mb..20 * mb, - // 20 * mb..24 * mb, - // ] - // ); - // } - - // #[test] - // fn test_split_multiple_ranges() { - // // [0..20MB, 24..44MB], concurrency=4: - // // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) - // // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. - // // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. - // // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] - // let mb = 1024 * 1024u64; - // let merged = vec![0..20 * mb, 24 * mb..44 * mb]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert_eq!( - // result, - // vec![ - // 0..8 * mb, - // 8 * mb..16 * mb, - // 16 * mb..20 * mb, - // 24 * mb..32 * mb, - // 32 * mb..40 * mb, - // 40 * mb..44 * mb, - // ] - // ); - // } - - // #[test] - // fn test_split_empty() { - // let merged: Vec> = vec![]; - // let result = super::split_ranges_for_concurrency(merged, 4); - // assert!(result.is_empty()); - // } } From 17690ef9dd2d278f9cefb5bcc14dcfee7df970d0 Mon Sep 17 00:00:00 2001 From: umi Date: Fri, 10 Apr 2026 17:32:49 +0800 Subject: [PATCH 24/24] fix --- crates/paimon/src/arrow/format/parquet.rs | 365 +++++++++++++++++++++- 1 file changed, 364 insertions(+), 1 deletion(-) diff --git a/crates/paimon/src/arrow/format/parquet.rs b/crates/paimon/src/arrow/format/parquet.rs index 915bd565..b0aa0ec2 100644 --- a/crates/paimon/src/arrow/format/parquet.rs +++ b/crates/paimon/src/arrow/format/parquet.rs @@ -33,7 +33,7 @@ use arrow_schema::ArrowError; use async_trait::async_trait; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{StreamExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -785,6 +785,18 @@ struct ArrowFileReader { r: Box, } +/// coalesce threshold: 1 MiB. +const RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// concurrent range fetches. +const RANGE_FETCH_CONCURRENCY: usize = 10; +/// metadata prefetch hint: 512 KiB. +const METADATA_SIZE_HINT: usize = 512 * 1024; +/// Minimum range size for splitting: 4 MiB. +/// The block size used for split alignment and as the minimum split +/// granularity. Ranges smaller than this will not be split further to +/// avoid excessive small IO requests whose per-request overhead dominates. +const IO_BLOCK_SIZE: u64 = 4 * 1024 * 1024; + impl ArrowFileReader { fn new(file_size: u64, r: Box) -> Self { Self { file_size, r } @@ -809,14 +821,128 @@ impl AsyncFileReader for ArrowFileReader { self.read_bytes(range) } + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let coalesce_bytes = RANGE_COALESCE_BYTES; + let concurrency = RANGE_FETCH_CONCURRENCY; + + async move { + if ranges.is_empty() { + return Ok(vec![]); + } + + // Two-phase range optimization: + // Phase 1: Merge nearby ranges based on coalesce threshold. + let coalesced = merge_byte_ranges(&ranges, coalesce_bytes); + // Phase 2: Split large merged ranges to utilize concurrency, + // but only at original range boundaries. + let fetch_ranges = split_ranges_for_concurrency(coalesced, concurrency); + + // Fetch merged ranges concurrently. + let r = &self.r; + let fetched: Vec = if fetch_ranges.len() <= concurrency { + // All ranges fit within the concurrency limit — fire them all at once. + futures::future::try_join_all(fetch_ranges.iter().map(|range| { + r.read(range.clone()) + .map_err(|e| parquet::errors::ParquetError::External(format!("{e}").into())) + })) + .await? + } else { + // More ranges than concurrency slots — use buffered stream. + futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range).await.map_err(|e| { + parquet::errors::ParquetError::External(format!("{e}").into()) + }) + }) + .buffered(concurrency) + .try_collect() + .await? + }; + + // Slice the fetched data back into the originally requested + // ranges. A single original range may span multiple fetch + // chunks, so we copy from as many chunks as needed. + let result: parquet::errors::Result> = ranges + .iter() + .map(|range| { + // Find the first fetch chunk whose end is past range.start. + let first = fetch_ranges.partition_point(|v| v.end <= range.start); + if first >= fetch_ranges.len() { + return Err(parquet::errors::ParquetError::General(format!( + "No fetch range covers requested range {}..{}", + range.start, range.end + ))); + } + + let need = (range.end - range.start) as usize; + + // Fast path: the original range fits entirely within one + // fetch chunk — zero-copy slice. + let fr = &fetch_ranges[first]; + if range.end <= fr.end { + let start = (range.start - fr.start) as usize; + let end = (range.end - fr.start) as usize; + return Ok(fetched[first].slice(start..end)); + } + + // Slow path: the original range spans multiple fetch + // chunks — copy pieces into a new buffer (mirrors Java's + // copyMultiBytesToBytes). + let mut buf = Vec::with_capacity(need); + let mut pos = range.start; + for i in first..fetch_ranges.len() { + if pos >= range.end { + break; + } + let fr = &fetch_ranges[i]; + let chunk = &fetched[i]; + let src_start = (pos - fr.start) as usize; + let src_end = ((range.end.min(fr.end)) - fr.start) as usize; + if src_end > chunk.len() { + return Err(parquet::errors::ParquetError::General(format!( + "Fetched data too short for range {}..{}: \ + chunk {}..{} has {} bytes, need up to offset {}", + range.start, + range.end, + fr.start, + fr.end, + chunk.len(), + src_end, + ))); + } + buf.extend_from_slice(&chunk[src_start..src_end]); + pos = fr.end; + } + if buf.len() != need { + return Err(parquet::errors::ParquetError::General(format!( + "Assembled {} bytes for range {}..{}, expected {}", + buf.len(), + range.start, + range.end, + need, + ))); + } + Ok(Bytes::from(buf)) + }) + .collect(); + result + } + .boxed() + } + fn get_metadata( &mut self, options: Option<&ArrowReaderOptions>, ) -> BoxFuture<'_, parquet::errors::Result>> { let metadata_opts = options.map(|o| o.metadata_options().clone()); + let prefetch_hint = Some(METADATA_SIZE_HINT); Box::pin(async move { let file_size = self.file_size; let metadata = ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch_hint) .with_metadata_options(metadata_opts) .load_and_finish(self, file_size) .await?; @@ -825,6 +951,98 @@ impl AsyncFileReader for ArrowFileReader { } } +// --------------------------------------------------------------------------- +// Range coalescing +// --------------------------------------------------------------------------- + +/// Merge nearby byte ranges to reduce the number of requests. +/// +/// Ranges whose gap is ≤ `coalesce` bytes are merged into a single range. +/// The input does not need to be sorted. +fn merge_byte_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut sorted = ranges.to_vec(); + sorted.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(sorted.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != sorted.len() { + let mut range_end = sorted[start_idx].end; + + while end_idx != sorted.len() + && sorted[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(sorted[end_idx].end); + end_idx += 1; + } + + merged.push(sorted[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + +/// Split merged ranges into fixed-size batches to utilize concurrency, +/// Each merged range is divided into chunks of `expected_size`, +/// with the last chunk taking whatever remains. +/// Ranges smaller than `2 * IO_BLOCK_SIZE` are kept as-is to +/// avoid excessive small IO requests. +fn split_ranges_for_concurrency(merged: Vec>, concurrency: usize) -> Vec> { + if merged.is_empty() || concurrency <= 1 { + return merged; + } + + let mut result = Vec::with_capacity(merged.len()); + + for range in &merged { + let length = range.end - range.start; + let raw_size = IO_BLOCK_SIZE.max(length.div_ceil(concurrency as u64)); + // Round up to the nearest multiple of IO_BLOCK_SIZE (4 MB) so that + // every split boundary is 4 MB-aligned relative to the range start. + let expected_size = raw_size.div_ceil(IO_BLOCK_SIZE) * IO_BLOCK_SIZE; + let min_tail_size = expected_size.max(IO_BLOCK_SIZE * 2); + + let mut offset = range.start; + let end = range.end; + + // Align the first split boundary: if `offset` is not 4 MB-aligned, + // emit a short head chunk so that all subsequent chunks start on a + // 4 MB boundary. + let misalign = offset % IO_BLOCK_SIZE; + if misalign != 0 { + let first_end = (offset - misalign + IO_BLOCK_SIZE).min(end); + result.push(offset..first_end); + offset = first_end; + } + + loop { + if offset >= end { + break; + } + if end - offset < min_tail_size { + result.push(offset..end); + break; + } else { + result.push(offset..offset + expected_size); + offset += expected_size; + } + } + } + + result +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -878,4 +1096,149 @@ mod tests { assert!(row_filter.is_some()); } + + // ----------------------------------------------------------------------- + // merge_byte_ranges tests + // ----------------------------------------------------------------------- + + #[test] + fn test_merge_byte_ranges_empty() { + assert_eq!( + super::merge_byte_ranges(&[], 1024), + Vec::>::new() + ); + } + + #[test] + fn test_merge_byte_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_byte_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_byte_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_byte_ranges_zero_coalesce_gap() { + // With coalesce=0, ranges with a 1-byte gap should NOT merge + let ranges = vec![0..100, 101..200]; + let merged = super::merge_byte_ranges(&ranges, 0); + assert_eq!(merged, vec![0..100, 101..200]); + } + + // ----------------------------------------------------------------------- + // split_ranges_for_concurrency tests + // ----------------------------------------------------------------------- + + #[test] + fn test_split_aligned_range_0_to_20mb() { + // 0..20MB, concurrency=4: + // raw_size = max(4MB, 5MB+1) = 5MB+1 + // expected_size = ceil((5MB+1)/4MB)*4MB = 8MB + // min_tail_size = max(8MB, 8MB) = 8MB + // No misalign. Chunks: [0..8, 8..16, 16..20] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![0..20 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result, vec![0..8 * mb, 8 * mb..16 * mb, 16 * mb..20 * mb]); + } + + #[test] + fn test_split_unaligned_start_6_to_14mb() { + // 6MB..14MB, concurrency=4: + // raw_size = max(4MB, 2MB+1) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // Head: 6..8MB. Loop: 8+8=16 > 14 → tail 8..14. + // Result: [6..8, 8..14] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![6 * mb..14 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!(result, vec![6 * mb..8 * mb, 8 * mb..14 * mb]); + } + + #[test] + fn test_split_unaligned_start_6_to_22mb() { + // 6MB..22MB, concurrency=4: + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = ceil(4MB/4MB)*4MB = 4MB + // min_tail_size = max(4MB, 8MB) = 8MB + // Head: 6..8MB (misalign=2MB). + // Loop: 22-8=14≥8 → 8..12; 22-12=10≥8 → 12..16; 22-16=6<8 → tail 16..22. + // Result: [6..8, 8..12, 12..16, 16..22] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![6 * mb..22 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 6 * mb..8 * mb, + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..22 * mb, + ] + ); + } + + #[test] + fn test_split_already_aligned_8_to_24mb() { + // 8MB..24MB, concurrency=4: + // raw_size = max(4MB, ceil(16MB/4)) = 4MB + // expected_size = 4MB, min_tail_size = 8MB + // No misalign. + // Loop: 24-8=16≥8 → 8..12; 24-12=12≥8 → 12..16; 24-16=8≥8 → 16..20; 24-20=4<8 → tail 20..24. + // Result: [8..12, 12..16, 16..20, 20..24] + let mb = 1024 * 1024u64; + #[allow(clippy::single_range_in_vec_init)] + let merged = vec![8 * mb..24 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 8 * mb..12 * mb, + 12 * mb..16 * mb, + 16 * mb..20 * mb, + 20 * mb..24 * mb, + ] + ); + } + + #[test] + fn test_split_multiple_ranges() { + // [0..20MB, 24..44MB], concurrency=4: + // Range 0..20MB → [0..8, 8..16, 16..20] (same as test above) + // Range 24..44MB (20MB): expected_size=8MB, min_tail_size=8MB, no misalign. + // 24+8=32 ≤ 44 → 24..32; 32+8=40 ≤ 44 → 32..40; 40+8=48 > 44 → tail 40..44. + // Result: [0..8, 8..16, 16..20, 24..32, 32..40, 40..44] + let mb = 1024 * 1024u64; + let merged = vec![0..20 * mb, 24 * mb..44 * mb]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert_eq!( + result, + vec![ + 0..8 * mb, + 8 * mb..16 * mb, + 16 * mb..20 * mb, + 24 * mb..32 * mb, + 32 * mb..40 * mb, + 40 * mb..44 * mb, + ] + ); + } + + #[test] + fn test_split_empty() { + let merged: Vec> = vec![]; + let result = super::split_ranges_for_concurrency(merged, 4); + assert!(result.is_empty()); + } }