From c0499e2905c218b713e9a8e347236e1422359f56 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Tue, 7 Apr 2026 15:39:57 +0800 Subject: [PATCH 1/7] feat(datafusion): support fetch contract in PaimonTableScan --- .../datafusion/src/filter_pushdown.rs | 38 ++ crates/integrations/datafusion/src/lib.rs | 1 - .../datafusion/src/physical_plan/mod.rs | 2 +- .../datafusion/src/physical_plan/scan.rs | 564 ++++++++++++++++-- .../integrations/datafusion/src/table/mod.rs | 136 +++-- .../datafusion/tests/read_tables.rs | 193 ++++-- crates/paimon/src/table/mod.rs | 2 +- crates/paimon/src/table/table_scan.rs | 170 ++++-- 8 files changed, 911 insertions(+), 195 deletions(-) diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 91c65d3d..b1a1ad93 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -20,6 +20,17 @@ use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown}; use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; +/// Whether scan-side row-count pruning can safely participate in physical fetch pushdown. +/// +/// `Exact` means every query filter is exact at the table-provider boundary, so +/// scan-side split pruning and physical fetch execution can be enabled without +/// leaving residual filtering above the scan. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ScanFetchMode { + Disabled, + Exact, +} + pub(crate) fn classify_filter_pushdown( filter: &Expr, fields: &[DataField], @@ -58,6 +69,23 @@ pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> } } +pub(crate) fn scan_fetch_mode( + filters: &[Expr], + fields: &[DataField], + partition_keys: &[String], +) -> ScanFetchMode { + if filters.is_empty() + || filters.iter().all(|filter| { + classify_filter_pushdown(filter, fields, partition_keys) + == TableProviderFilterPushDown::Exact + }) + { + ScanFetchMode::Exact + } else { + ScanFetchMode::Disabled + } +} + fn split_conjunction(expr: &Expr) -> Vec<&Expr> { match expr { Expr::BinaryExpr(BinaryExpr { @@ -392,6 +420,16 @@ mod tests { ); } + #[test] + fn test_scan_fetch_mode_is_exact_without_filters() { + let fields = test_fields(); + + assert_eq!( + scan_fetch_mode(&[], &fields, &partition_keys()), + ScanFetchMode::Exact + ); + } + #[test] fn test_translate_reversed_partition_comparison() { let fields = test_fields(); diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 8454bf77..1ae212fa 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -46,6 +46,5 @@ mod table; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; pub use error::to_datafusion_error; -pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index 48aa5469..1f06658f 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -17,4 +17,4 @@ pub(crate) mod scan; -pub use scan::PaimonTableScan; +pub(crate) use scan::PaimonTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index d7dfc7aa..e1f441c0 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -16,53 +16,284 @@ // under the License. use std::any::Any; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; -use futures::{StreamExt, TryStreamExt}; +use datafusion::physical_plan::{ + DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, +}; +use futures::{Stream, StreamExt, TryStreamExt}; use paimon::spec::Predicate; -use paimon::table::Table; +use paimon::table::{prune_splits_by_limit_hint, Table}; use paimon::DataSplit; use crate::error::to_datafusion_error; +use crate::filter_pushdown::ScanFetchMode; + +#[derive(Debug)] +pub(crate) struct PartitionPlan { + planned_partitions: Vec>, + partition_fetches: Vec>, +} + +#[derive(Debug, Clone)] +pub(crate) struct PaimonTableScanSpec { + pub table: Table, + pub projected_columns: Option>, + pub pushed_predicate: Option, + pub full_splits: Arc<[DataSplit]>, + pub target_partitions: usize, + pub scan_fetch_mode: ScanFetchMode, + pub limit_hint: Option, + pub fetch: Option, +} + +fn round_robin_buckets(items: Vec, num_buckets: usize) -> Vec> { + let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); + for (index, item) in items.into_iter().enumerate() { + buckets[index % num_buckets].push(item); + } + buckets +} + +fn empty_partition_plan() -> PartitionPlan { + PartitionPlan { + planned_partitions: vec![Arc::from(Vec::new())], + partition_fetches: vec![None], + } +} + +fn build_round_robin_plan(splits: Vec, target_partitions: usize) -> PartitionPlan { + if splits.is_empty() { + return empty_partition_plan(); + } + + let num_partitions = splits.len().min(target_partitions.max(1)); + let buckets = round_robin_buckets(splits, num_partitions); + + PartitionPlan { + planned_partitions: buckets.into_iter().map(Arc::from).collect(), + partition_fetches: vec![None; num_partitions], + } +} + +fn build_exact_fetch_plan( + splits: Vec, + target_partitions: usize, + fetch: usize, +) -> PartitionPlan { + if splits.is_empty() { + return empty_partition_plan(); + } + + if target_partitions <= 1 { + return PartitionPlan { + planned_partitions: vec![Arc::from(splits)], + partition_fetches: vec![Some(fetch)], + }; + } + + let mut prefix_splits = Vec::new(); + let mut prefix_rows = Vec::new(); + let mut remaining = fetch; + let mut tail_start = None; + + for (index, split) in splits.iter().enumerate() { + match split.merged_row_count() { + Some(count) if count >= 0 && (count as usize) <= remaining => { + prefix_splits.push(split.clone()); + prefix_rows.push(count as usize); + remaining -= count as usize; + if remaining == 0 { + break; + } + } + _ => { + tail_start = Some(index); + break; + } + } + } + + if tail_start.is_none() { + let num_partitions = prefix_splits.len().min(target_partitions.max(1)); + let split_buckets = round_robin_buckets(prefix_splits, num_partitions); + let row_buckets = round_robin_buckets(prefix_rows, num_partitions); + + return PartitionPlan { + planned_partitions: split_buckets.into_iter().map(Arc::from).collect(), + partition_fetches: row_buckets + .into_iter() + .map(|rows| Some(rows.into_iter().sum())) + .collect(), + }; + } + + let tail_start = tail_start.expect("tail_start checked above"); + let tail_fetch = remaining; + + if prefix_splits.is_empty() { + return PartitionPlan { + planned_partitions: vec![Arc::from(splits)], + partition_fetches: vec![Some(fetch)], + }; + } + + let prefix_partition_count = prefix_splits + .len() + .min(target_partitions.saturating_sub(1).max(1)); + let split_buckets = round_robin_buckets(prefix_splits, prefix_partition_count); + let row_buckets = round_robin_buckets(prefix_rows, prefix_partition_count); + + let mut planned_partitions: Vec> = + split_buckets.into_iter().map(Arc::from).collect(); + let mut partition_fetches: Vec> = row_buckets + .into_iter() + .map(|rows| Some(rows.into_iter().sum())) + .collect(); + + planned_partitions.push(Arc::from(splits[tail_start..].to_vec())); + partition_fetches.push(Some(tail_fetch)); + + PartitionPlan { + planned_partitions, + partition_fetches, + } +} + +pub(crate) fn build_partition_plan( + full_splits: &[DataSplit], + target_partitions: usize, + limit_hint: Option, + fetch: Option, +) -> PartitionPlan { + if let Some(fetch) = fetch { + let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch)); + build_exact_fetch_plan(selected_splits, target_partitions, fetch) + } else { + let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), limit_hint); + build_round_robin_plan(selected_splits, target_partitions) + } +} + +struct ExactFetchStream { + input: SendableRecordBatchStream, + remaining: usize, +} + +impl ExactFetchStream { + fn new(input: SendableRecordBatchStream, remaining: usize) -> Self { + Self { input, remaining } + } +} + +impl Stream for ExactFetchStream { + type Item = DFResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + return Poll::Ready(None); + } + + let next = futures::ready!(self.input.as_mut().poll_next(cx)); + match next { + Some(Ok(batch)) => { + let batch = if batch.num_rows() > self.remaining { + batch.slice(0, self.remaining) + } else { + batch + }; + self.remaining -= batch.num_rows(); + Poll::Ready(Some(Ok(batch))) + } + Some(Err(error)) => Poll::Ready(Some(Err(error))), + None => Poll::Ready(None), + } + } +} + +impl RecordBatchStream for ExactFetchStream { + fn schema(&self) -> ArrowSchemaRef { + self.input.schema() + } +} /// Execution plan that scans a Paimon table with optional column projection. /// /// Planning is performed eagerly in [`super::super::table::PaimonTableProvider::scan`], /// and the resulting splits are distributed across DataFusion execution partitions /// so that DataFusion can schedule them in parallel. -#[derive(Debug)] -pub struct PaimonTableScan { +#[derive(Debug, Clone)] +pub(crate) struct PaimonTableScan { table: Table, /// Projected column names (if None, reads all columns). projected_columns: Option>, - /// Filter translated from DataFusion expressions and reused during execute() - /// so reader-side pruning reaches the actual read path. + /// Filter translated from DataFusion expressions and reused during execute(). pushed_predicate: Option, - /// Pre-planned partition assignments: `planned_partitions[i]` contains the - /// Paimon splits that DataFusion partition `i` will read. - /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`. + /// Full split plan after filter pushdown, before any row-count hint is applied. + full_splits: Arc<[DataSplit]>, + /// Split assignments per DataFusion partition. planned_partitions: Vec>, + partition_fetches: Vec>, + target_partitions: usize, plan_properties: PlanProperties, - /// Optional limit on the number of rows to return. - limit: Option, + scan_fetch_mode: ScanFetchMode, + limit_hint: Option, + fetch: Option, } impl PaimonTableScan { - pub(crate) fn new( + pub(crate) fn new(schema: ArrowSchemaRef, spec: PaimonTableScanSpec) -> Self { + let PartitionPlan { + planned_partitions, + partition_fetches, + } = build_partition_plan( + &spec.full_splits, + spec.target_partitions, + spec.limit_hint, + spec.fetch, + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(planned_partitions.len()), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + table: spec.table, + projected_columns: spec.projected_columns, + pushed_predicate: spec.pushed_predicate, + full_splits: spec.full_splits, + planned_partitions, + partition_fetches, + target_partitions: spec.target_partitions, + plan_properties, + scan_fetch_mode: spec.scan_fetch_mode, + limit_hint: spec.limit_hint, + fetch: spec.fetch, + } + } + + #[cfg(test)] + pub(crate) fn new_with_planned_partitions( schema: ArrowSchemaRef, - table: Table, - projected_columns: Option>, - pushed_predicate: Option, + spec: PaimonTableScanSpec, planned_partitions: Vec>, - limit: Option, + partition_fetches: Vec>, ) -> Self { + let full_splits = planned_partitions + .iter() + .flat_map(|partition| partition.iter().cloned()) + .collect::>(); + let target_partitions = planned_partitions.len().max(1); let plan_properties = PlanProperties::new( EquivalenceProperties::new(schema.clone()), Partitioning::UnknownPartitioning(planned_partitions.len()), @@ -70,19 +301,20 @@ impl PaimonTableScan { Boundedness::Bounded, ); Self { - table, - projected_columns, - pushed_predicate, + table: spec.table, + projected_columns: spec.projected_columns, + pushed_predicate: spec.pushed_predicate, + full_splits: Arc::from(full_splits), planned_partitions, + partition_fetches, + target_partitions, plan_properties, - limit, + scan_fetch_mode: spec.scan_fetch_mode, + limit_hint: spec.limit_hint, + fetch: spec.fetch, } } - pub fn table(&self) -> &Table { - &self.table - } - #[cfg(test)] pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] { &self.planned_partitions @@ -93,8 +325,9 @@ impl PaimonTableScan { self.pushed_predicate.as_ref() } - pub fn limit(&self) -> Option { - self.limit + #[cfg(test)] + pub(crate) fn limit_hint(&self) -> Option { + self.limit_hint } } @@ -122,6 +355,35 @@ impl ExecutionPlan for PaimonTableScan { Ok(self) } + fn with_fetch(&self, fetch: Option) -> Option> { + if self.scan_fetch_mode != ScanFetchMode::Exact { + return None; + } + + let fetch = fetch.filter(|fetch| *fetch > 0); + if self.fetch == fetch { + return Some(Arc::new(self.clone())); + } + + Some(Arc::new(Self::new( + self.schema(), + PaimonTableScanSpec { + table: self.table.clone(), + projected_columns: self.projected_columns.clone(), + pushed_predicate: self.pushed_predicate.clone(), + full_splits: Arc::clone(&self.full_splits), + target_partitions: self.target_partitions, + scan_fetch_mode: self.scan_fetch_mode, + limit_hint: fetch, + fetch, + }, + ))) + } + + fn fetch(&self) -> Option { + self.fetch + } + fn execute( &self, partition: usize, @@ -133,6 +395,12 @@ impl ExecutionPlan for PaimonTableScan { self.planned_partitions.len() )) })?); + let partition_fetch = *self.partition_fetches.get(partition).ok_or_else(|| { + datafusion::error::DataFusionError::Internal(format!( + "PaimonTableScan: partition fetch index {partition} out of range (total {})", + self.partition_fetches.len() + )) + })?; let table = self.table.clone(); let schema = self.schema(); @@ -160,10 +428,16 @@ impl ExecutionPlan for PaimonTableScan { )) }; - Ok(Box::pin(RecordBatchStreamAdapter::new( + let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(fut).try_flatten(), - ))) + )); + + if let Some(fetch) = partition_fetch { + Ok(Box::pin(ExactFetchStream::new(stream, fetch))) + } else { + Ok(stream) + } } } @@ -178,8 +452,11 @@ impl DisplayAs for PaimonTableScan { "PaimonTableScan: partitions={}", self.planned_partitions.len() )?; - if let Some(limit) = self.limit { - write!(f, ", limit={limit}")?; + if let Some(limit_hint) = self.limit_hint { + write!(f, ", limit_hint={limit_hint}")?; + } + if let Some(fetch) = self.fetch { + write!(f, ", fetch={fetch}")?; } Ok(()) } @@ -202,6 +479,7 @@ mod tests { use paimon::spec::{ BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema, }; + use paimon::DeletionFile; use std::fs; use tempfile::tempdir; use test_utils::{local_file_path, test_data_file, write_int_parquet_file}; @@ -217,13 +495,20 @@ mod tests { #[test] fn test_partition_count_empty_plan() { let schema = test_schema(); - let scan = PaimonTableScan::new( + let scan = PaimonTableScan::new_with_planned_partitions( schema, - dummy_table(), - None, - None, + PaimonTableScanSpec { + table: dummy_table(), + projected_columns: None, + pushed_predicate: None, + full_splits: Arc::from(Vec::new()), + target_partitions: 1, + scan_fetch_mode: ScanFetchMode::Disabled, + limit_hint: None, + fetch: None, + }, vec![Arc::from(Vec::new())], - None, + vec![None], ); assert_eq!(scan.properties().output_partitioning().partition_count(), 1); } @@ -236,11 +521,199 @@ mod tests { Arc::from(Vec::new()), Arc::from(Vec::new()), ]; - let scan = - PaimonTableScan::new(schema, dummy_table(), None, None, planned_partitions, None); + let scan = PaimonTableScan::new_with_planned_partitions( + schema, + PaimonTableScanSpec { + table: dummy_table(), + projected_columns: None, + pushed_predicate: None, + full_splits: Arc::from(Vec::new()), + target_partitions: 3, + scan_fetch_mode: ScanFetchMode::Disabled, + limit_hint: None, + fetch: None, + }, + planned_partitions, + vec![None, None, None], + ); assert_eq!(scan.properties().output_partitioning().partition_count(), 3); } + #[test] + fn test_scan_exposes_fetch_capability_only_when_safe() { + let schema = test_schema(); + let exact_scan = PaimonTableScan::new_with_planned_partitions( + schema, + PaimonTableScanSpec { + table: dummy_table(), + projected_columns: None, + pushed_predicate: None, + full_splits: Arc::from(Vec::new()), + target_partitions: 1, + scan_fetch_mode: ScanFetchMode::Exact, + limit_hint: Some(10), + fetch: None, + }, + vec![Arc::from(Vec::new())], + vec![None], + ); + let disabled_scan = PaimonTableScan::new_with_planned_partitions( + test_schema(), + PaimonTableScanSpec { + table: dummy_table(), + projected_columns: None, + pushed_predicate: None, + full_splits: Arc::from(Vec::new()), + target_partitions: 1, + scan_fetch_mode: ScanFetchMode::Disabled, + limit_hint: Some(10), + fetch: None, + }, + vec![Arc::from(Vec::new())], + vec![None], + ); + + let fetched_scan = ExecutionPlan::with_fetch(&exact_scan, Some(10)) + .expect("safe scans should accept physical fetch pushdown"); + let fetched_scan = fetched_scan + .as_any() + .downcast_ref::() + .expect("with_fetch should return a rebuilt PaimonTableScan"); + + assert_eq!(ExecutionPlan::fetch(fetched_scan), Some(10)); + assert_eq!(fetched_scan.limit_hint(), Some(10)); + assert!(ExecutionPlan::with_fetch(&disabled_scan, Some(10)).is_none()); + } + + fn plan_file_names(partitions: &[Arc<[DataSplit]>]) -> Vec> { + partitions + .iter() + .map(|partition| { + partition + .iter() + .map(|split| split.data_files()[0].file_name.clone()) + .collect() + }) + .collect() + } + + fn counted_split(file_name: &str, row_count: i64) -> DataSplit { + paimon::DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![test_data_file(file_name, row_count)]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + fn unknown_count_split(file_name: &str, row_count: i64) -> DataSplit { + paimon::DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![test_data_file(file_name, row_count)]) + .with_data_deletion_files(vec![Some(DeletionFile::new( + format!("file:/tmp/{file_name}.dv"), + 0, + 0, + None, + ))]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + #[test] + fn test_build_partition_plan_assigns_unknown_merged_row_count_to_tail_fetch_partition() { + let full_splits = vec![ + counted_split("a.parquet", 2), + unknown_count_split("b.parquet", 4), + counted_split("c.parquet", 3), + ]; + + let plan = build_partition_plan(&full_splits, 3, None, Some(3)); + + assert_eq!( + plan_file_names(&plan.planned_partitions), + vec![ + vec!["a.parquet".to_string()], + vec!["b.parquet".to_string(), "c.parquet".to_string()], + ] + ); + assert_eq!(plan.partition_fetches, vec![Some(2), Some(1)]); + } + + #[test] + fn test_build_exact_fetch_plan_keeps_large_first_split_in_single_partition() { + let plan = build_exact_fetch_plan( + vec![ + counted_split("large.parquet", 10), + counted_split("tail.parquet", 2), + ], + 4, + 3, + ); + + assert_eq!( + plan_file_names(&plan.planned_partitions), + vec![vec![ + "large.parquet".to_string(), + "tail.parquet".to_string() + ]] + ); + assert_eq!(plan.partition_fetches, vec![Some(3)]); + } + + #[test] + fn test_build_exact_fetch_plan_preserves_zero_row_prefix_budget() { + let plan = build_exact_fetch_plan( + vec![ + counted_split("zero.parquet", 0), + counted_split("tail.parquet", 5), + ], + 2, + 3, + ); + + assert_eq!( + plan_file_names(&plan.planned_partitions), + vec![ + vec!["zero.parquet".to_string()], + vec!["tail.parquet".to_string()], + ] + ); + assert_eq!(plan.partition_fetches, vec![Some(0), Some(3)]); + } + + #[test] + fn test_build_exact_fetch_plan_splits_prefix_and_tail_fetches_across_partitions() { + let plan = build_exact_fetch_plan( + vec![ + counted_split("a.parquet", 2), + counted_split("b.parquet", 2), + counted_split("c.parquet", 10), + ], + 3, + 5, + ); + + assert_eq!( + plan_file_names(&plan.planned_partitions), + vec![ + vec!["a.parquet".to_string()], + vec!["b.parquet".to_string()], + vec!["c.parquet".to_string()], + ] + ); + assert_eq!(plan.partition_fetches, vec![Some(2), Some(2), Some(1)]); + } + /// Constructs a minimal Table for testing (no real files needed since we /// only test PlanProperties, not actual reads). fn dummy_table() -> Table { @@ -306,11 +779,16 @@ mod tests { )])); let scan = PaimonTableScan::new( schema, - table, - Some(vec!["id".to_string()]), - Some(pushed_predicate), - vec![Arc::from(vec![split])], - None, + PaimonTableScanSpec { + table, + projected_columns: Some(vec!["id".to_string()]), + pushed_predicate: Some(pushed_predicate), + full_splits: Arc::from(vec![split]), + target_partitions: 1, + scan_fetch_mode: ScanFetchMode::Disabled, + limit_hint: None, + fetch: None, + }, ); let ctx = SessionContext::new(); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index a76174dd..5894d375 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -30,7 +30,10 @@ use datafusion::physical_plan::ExecutionPlan; use paimon::table::Table; use crate::error::to_datafusion_error; -use crate::filter_pushdown::{build_pushed_predicate, classify_filter_pushdown}; +use crate::filter_pushdown::{ + build_pushed_predicate, classify_filter_pushdown, scan_fetch_mode, ScanFetchMode, +}; +use crate::physical_plan::scan::PaimonTableScanSpec; use crate::physical_plan::PaimonTableScan; use crate::runtime::await_with_runtime; @@ -65,15 +68,6 @@ impl PaimonTableProvider { } } -/// Distribute `items` into `num_buckets` groups using round-robin assignment. -fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { - let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); - for (i, item) in items.into_iter().enumerate() { - buckets[i % num_buckets].push(item); - } - buckets -} - #[async_trait] impl TableProvider for PaimonTableProvider { fn as_any(&self) -> &dyn Any { @@ -113,11 +107,13 @@ impl TableProvider for PaimonTableProvider { if let Some(filter) = pushed_predicate.clone() { read_builder.with_filter(filter); } - // Push the limit hint to paimon-core planning to reduce splits when possible. - // DataFusion still enforces the final LIMIT semantics. - if let Some(limit) = limit { - read_builder.with_limit(limit); - } + let scan_fetch_mode = scan_fetch_mode( + filters, + self.table.schema().fields(), + self.table.schema().partition_keys(), + ); + let limit_hint = + limit.filter(|limit| *limit > 0 && scan_fetch_mode == ScanFetchMode::Exact); let scan = read_builder.new_scan(); // DataFusion's Python FFI may poll `TableProvider::scan()` without an active // Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem calls while @@ -127,29 +123,21 @@ impl TableProvider for PaimonTableProvider { .await .map_err(to_datafusion_error)?; - // Distribute splits across DataFusion partitions, capped by the - // session's target_partitions to avoid over-sharding with many small splits. - // Each partition's splits are wrapped in Arc to avoid deep-cloning in execute(). - let splits = plan.splits().to_vec(); - let planned_partitions: Vec> = if splits.is_empty() { - // Empty plans get a single empty partition to avoid 0-partition edge cases. - vec![Arc::from(Vec::new())] - } else { - let target = state.config_options().execution.target_partitions; - let num_partitions = splits.len().min(target.max(1)); - bucket_round_robin(splits, num_partitions) - .into_iter() - .map(Arc::from) - .collect() - }; + let target_partitions = state.config_options().execution.target_partitions.max(1); + let full_splits = Arc::from(plan.splits().to_vec()); Ok(Arc::new(PaimonTableScan::new( projected_schema, - self.table.clone(), - projected_columns, - pushed_predicate, - planned_partitions, - limit, + PaimonTableScanSpec { + table: self.table.clone(), + projected_columns, + pushed_predicate, + full_splits, + target_partitions, + scan_fetch_mode, + limit_hint, + fetch: None, + }, ))) } @@ -175,30 +163,13 @@ mod tests { use datafusion::datasource::TableProvider; use datafusion::logical_expr::{col, lit, Expr}; + use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use paimon::catalog::Identifier; use paimon::{Catalog, CatalogOptions, DataSplit, FileSystemCatalog, Options}; use crate::physical_plan::PaimonTableScan; - #[test] - fn test_bucket_round_robin_distributes_evenly() { - let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3); - assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]); - } - - #[test] - fn test_bucket_round_robin_fewer_items_than_buckets() { - let result = bucket_round_robin(vec![10, 20], 2); - assert_eq!(result, vec![vec![10], vec![20]]); - } - - #[test] - fn test_bucket_round_robin_single_bucket() { - let result = bucket_round_robin(vec![1, 2, 3], 1); - assert_eq!(result, vec![vec![1, 2, 3]]); - } - fn get_test_warehouse() -> String { std::env::var("PAIMON_TEST_WAREHOUSE") .unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) @@ -225,14 +196,9 @@ mod tests { async fn plan_partitions( provider: &PaimonTableProvider, filters: Vec, + limit: Option, ) -> Vec> { - let config = SessionConfig::new().with_target_partitions(8); - let ctx = SessionContext::new_with_config(config); - let state = ctx.state(); - let plan = provider - .scan(&state, None, &filters, None) - .await - .expect("scan() should succeed"); + let plan = plan_scan(provider, filters, limit).await; let scan = plan .as_any() .downcast_ref::() @@ -241,6 +207,20 @@ mod tests { scan.planned_partitions().to_vec() } + async fn plan_scan( + provider: &PaimonTableProvider, + filters: Vec, + limit: Option, + ) -> Arc { + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::new_with_config(config); + let state = ctx.state(); + provider + .scan(&state, None, &filters, limit) + .await + .expect("scan() should succeed") + } + fn extract_dt_partition_set(planned_partitions: &[Arc<[DataSplit]>]) -> BTreeSet { planned_partitions .iter() @@ -278,7 +258,7 @@ mod tests { async fn test_scan_partition_filter_plans_matching_partition_set() { let provider = create_provider("partitioned_log_table").await; let planned_partitions = - plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await; + plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await; assert_eq!( extract_dt_partition_set(&planned_partitions), @@ -292,6 +272,7 @@ mod tests { let planned_partitions = plan_partitions( &provider, vec![col("dt").eq(lit("2024-01-01")).and(col("id").gt(lit(1)))], + None, ) .await; @@ -306,10 +287,11 @@ mod tests { let provider = create_provider("multi_partitioned_log_table").await; let dt_only_partitions = - plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))]).await; + plan_partitions(&provider, vec![col("dt").eq(lit("2024-01-01"))], None).await; let dt_hr_partitions = plan_partitions( &provider, vec![col("dt").eq(lit("2024-01-01")).and(col("hr").eq(lit(10)))], + None, ) .await; @@ -348,4 +330,34 @@ mod tests { assert_eq!(scan.pushed_predicate(), Some(&expected)); } + + #[tokio::test] + async fn test_scan_applies_limit_hint_only_when_safe() { + let provider = create_provider("partitioned_log_table").await; + let full_plan = plan_partitions(&provider, vec![], None).await; + + let config = SessionConfig::new().with_target_partitions(8); + let ctx = SessionContext::new_with_config(config); + let state = ctx.state(); + let plan = provider + .scan(&state, None, &[], Some(1)) + .await + .expect("scan() should succeed"); + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit_hint(), Some(1)); + assert!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::() + < full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); + } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index d3966ca6..d0ca7f45 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -22,6 +22,7 @@ use datafusion::arrow::array::{Array, Int32Array, StringArray}; use datafusion::catalog::CatalogProvider; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown}; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use paimon::catalog::Identifier; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; @@ -97,6 +98,14 @@ async fn collect_query( ctx.sql(sql).await?.collect().await } +async fn create_physical_plan( + table_name: &str, + sql: &str, +) -> datafusion::error::Result> { + let ctx = create_context(table_name).await; + ctx.sql(sql).await?.create_physical_plan().await +} + fn extract_id_name_rows( batches: &[datafusion::arrow::record_batch::RecordBatch], ) -> Vec<(i32, String)> { @@ -121,6 +130,17 @@ fn extract_id_name_rows( rows } +fn format_physical_plan(plan: &Arc) -> String { + displayable(plan.as_ref()).indent(true).to_string() +} + +fn paimon_scan_lines(plan_text: &str) -> Vec<&str> { + plan_text + .lines() + .filter(|line| line.contains("PaimonTableScan:")) + .collect() +} + #[tokio::test] async fn test_read_log_table_via_datafusion() { let actual_rows = read_rows("simple_log_table").await; @@ -302,52 +322,149 @@ async fn test_mixed_and_filter_keeps_residual_datafusion_filter() { assert_eq!(actual_rows, vec![(2, "bob".to_string())]); } -/// Test limit pushdown: ensures that LIMIT queries return the correct number of rows. #[tokio::test] -async fn test_limit_pushdown() { - // Test append-only table (simple_log_table) - { - let batches = collect_query( - "simple_log_table", - "SELECT id, name FROM simple_log_table LIMIT 2", - ) +async fn test_limit_pushdown_on_data_evolution_table_returns_merged_rows() { + let batches = collect_query( + "data_evolution_table", + "SELECT id, name FROM data_evolution_table LIMIT 3", + ) + .await + .expect("Limit query on data evolution table should succeed"); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 3, + "LIMIT 3 should return exactly 3 rows for data evolution table" + ); + + let mut rows = extract_id_name_rows(&batches); + rows.sort_by_key(|(id, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "alice-v2".to_string()), + (2, "bob".to_string()), + (3, "carol-v2".to_string()), + ], + "Data evolution table LIMIT 3 should return merged rows" + ); +} + +#[tokio::test] +async fn test_limit_pushdown_applies_safe_scan_fetch_contract_and_keeps_correctness() { + let sql = "SELECT id, name FROM simple_log_table LIMIT 2"; + let plan = create_physical_plan("simple_log_table", sql) .await - .expect("Limit query should succeed"); + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); - let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - assert_eq!(total_rows, 2, "LIMIT 2 should return exactly 2 rows"); - } + assert!( + scan_lines + .iter() + .any(|line| line.contains("limit_hint=2") && line.contains("fetch=2")), + "Safe LIMIT query should push DataFusion physical fetch into PaimonTableScan, plan:\n{plan_text}" + ); - // Test data evolution table - { - let batches = collect_query( - "data_evolution_table", - "SELECT id, name FROM data_evolution_table LIMIT 3", - ) + let batches = collect_query("simple_log_table", sql) .await - .expect("Limit query on data evolution table should succeed"); + .expect("LIMIT query should succeed"); + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!(total_rows, 2, "LIMIT 2 should still return exactly 2 rows"); +} - let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); - assert_eq!( - total_rows, 3, - "LIMIT 3 should return exactly 3 rows for data evolution table" - ); +#[tokio::test] +async fn test_offset_limit_pushdown_applies_scan_fetch_contract_and_keeps_correctness() { + let sql = "SELECT id, name FROM partitioned_log_table OFFSET 1 LIMIT 1"; + let plan = create_physical_plan("partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); - // Verify the data is from the merged result (not raw files) - let mut rows = extract_id_name_rows(&batches); - rows.sort_by_key(|(id, _)| *id); + assert!( + plan_text.contains("GlobalLimitExec"), + "OFFSET queries should keep a GlobalLimitExec in DataFusion, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .any(|line| line.contains("limit_hint=2") && line.contains("fetch=2")), + "OFFSET + LIMIT should rebuild the scan with fetch=skip+fetch, plan:\n{plan_text}" + ); - // LIMIT 3 returns ids 1, 2, 3 with merged values - assert_eq!( - rows, - vec![ - (1, "alice-v2".to_string()), - (2, "bob".to_string()), - (3, "carol-v2".to_string()), - ], - "Data evolution table LIMIT 3 should return merged rows" - ); - } + let batches = collect_query("partitioned_log_table", sql) + .await + .expect("OFFSET + LIMIT query should succeed"); + + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!( + total_rows, 1, + "OFFSET 1 LIMIT 1 should still return exactly 1 row" + ); +} + +#[tokio::test] +async fn test_inexact_filter_limit_disables_scan_limit_hint_and_keeps_correctness() { + let sql = "SELECT id, name FROM partitioned_log_table WHERE id > 1 LIMIT 1"; + let plan = create_physical_plan("partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .all(|line| !line.contains("limit_hint=") && !line.contains("fetch=")), + "Inexact filter queries should fail open without scan fetch pushdown, plan:\n{plan_text}" + ); + + let batches = collect_query("partitioned_log_table", sql) + .await + .expect("Inexact filter + LIMIT query should succeed"); + let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); + assert_eq!( + total_rows, 1, + "Inexact filter + LIMIT should still return exactly 1 row" + ); +} + +#[tokio::test] +async fn test_residual_filter_limit_disables_scan_limit_hint_and_keeps_correctness() { + let sql = "SELECT id, name FROM simple_log_table WHERE id + 1 > 3 LIMIT 1"; + let plan = create_physical_plan("simple_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .all(|line| !line.contains("limit_hint=") && !line.contains("fetch=")), + "Residual filter queries should fail open without scan fetch pushdown, plan:\n{plan_text}" + ); + + let batches = collect_query("simple_log_table", sql) + .await + .expect("Residual filter + LIMIT query should succeed"); + let rows = extract_id_name_rows(&batches); + + assert_eq!( + rows, + vec![(3, "carol".to_string())], + "Residual filter + LIMIT should still return the matching row" + ); } // ======================= Catalog Provider Tests ======================= diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 481c757b..57e71c7e 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -34,7 +34,7 @@ pub use read_builder::{ReadBuilder, TableRead}; pub use schema_manager::SchemaManager; pub use snapshot_manager::SnapshotManager; pub use source::{DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; -pub use table_scan::TableScan; +pub use table_scan::{prune_splits_by_limit_hint, TableScan}; pub use tag_manager::TagManager; use crate::catalog::Identifier; diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index f014b4b7..7bf953cc 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -306,6 +306,45 @@ pub struct TableScan<'a> { limit: Option, } +/// Apply a limit-pushdown hint to the generated splits. +/// +/// Iterates through splits and accumulates `merged_row_count()` until the +/// limit hint is reached. Returns only the splits likely needed to satisfy +/// that hint. +/// +/// This does not guarantee an exact final row count. If a split's +/// `merged_row_count()` is `None` (for example because of unknown deletion +/// cardinality), that split is kept even though its contribution to the +/// limit is unknown. Planning may still stop early later if the +/// accumulated known `merged_row_count()` reaches the limit, and the +/// caller or query engine must enforce the final LIMIT. +pub fn prune_splits_by_limit_hint( + splits: impl IntoIterator, + limit_hint: Option, +) -> Vec { + let Some(limit_hint) = limit_hint else { + return splits.into_iter().collect(); + }; + + let mut limited_splits = Vec::new(); + let mut scanned_row_count: i64 = 0; + + for split in splits { + match split.merged_row_count() { + Some(merged_count) => { + limited_splits.push(split); + scanned_row_count += merged_count; + if scanned_row_count >= limit_hint as i64 { + return limited_splits; + } + } + None => limited_splits.push(split), + } + } + + limited_splits +} + impl<'a> TableScan<'a> { pub fn new( table: &'a Table, @@ -374,51 +413,8 @@ impl<'a> TableScan<'a> { } } - /// Apply a limit-pushdown hint to the generated splits. - /// - /// Iterates through splits and accumulates `merged_row_count()` until the - /// limit hint is reached. Returns only the splits likely needed to satisfy - /// that hint. - /// - /// This does not guarantee an exact final row count. If a split's - /// `merged_row_count()` is `None` (for example because of unknown deletion - /// cardinality), that split is kept even though its contribution to the - /// limit is unknown. Planning may still stop early later if the - /// accumulated known `merged_row_count()` reaches the limit, and the - /// caller or query engine must enforce the final LIMIT. fn apply_limit_pushdown(&self, splits: Vec) -> Vec { - let limit = match self.limit { - Some(l) => l, - None => return splits, - }; - - if splits.is_empty() { - return splits; - } - - let mut limited_splits = Vec::new(); - let mut scanned_row_count: i64 = 0; - - for split in splits { - match split.merged_row_count() { - Some(merged_count) => { - limited_splits.push(split); - scanned_row_count += merged_count; - if scanned_row_count >= limit as i64 { - // We likely have enough rows for the limit hint. - return limited_splits; - } - } - None => { - // Can't compute merged row count, so keep this split and - // rely on the caller or query engine to enforce the final - // LIMIT. - limited_splits.push(split); - } - } - } - - limited_splits + prune_splits_by_limit_hint(splits, self.limit) } async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { @@ -686,14 +682,14 @@ impl<'a> TableScan<'a> { #[cfg(test)] mod tests { - use super::partition_matches_predicate; + use super::{partition_matches_predicate, prune_splits_by_limit_hint}; use crate::spec::{ - stats::BinaryTableStats, ArrayType, BinaryRowBuilder, DataField, DataFileMeta, DataType, - Datum, DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, Predicate, - PredicateBuilder, PredicateOperator, VarCharType, + stats::BinaryTableStats, ArrayType, BinaryRow, BinaryRowBuilder, DataField, DataFileMeta, + DataType, Datum, DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, + Predicate, PredicateBuilder, PredicateOperator, VarCharType, }; use crate::table::bucket_filter::{compute_target_buckets, extract_predicate_for_keys}; - use crate::table::source::DeletionFile; + use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile}; use crate::table::stats_filter::{data_file_matches_predicates, group_by_overlapping_row_id}; use crate::Error; use chrono::{DateTime, Utc}; @@ -808,6 +804,82 @@ mod tests { } } + fn limit_test_split(file_name: &str, row_count: i64) -> DataSplit { + let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); + file.file_name = file_name.to_string(); + + DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![file]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + fn limit_test_split_with_unknown_merged_row_count( + file_name: &str, + row_count: i64, + ) -> DataSplit { + let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); + file.file_name = file_name.to_string(); + + DataSplitBuilder::new() + .with_snapshot(1) + .with_partition(BinaryRow::new(0)) + .with_bucket(0) + .with_bucket_path(format!("file:/tmp/{file_name}")) + .with_total_buckets(1) + .with_data_files(vec![file]) + .with_data_deletion_files(vec![Some(DeletionFile::new( + format!("file:/tmp/{file_name}.dv"), + 0, + 0, + None, + ))]) + .with_raw_convertible(true) + .build() + .unwrap() + } + + fn split_file_names(splits: &[DataSplit]) -> Vec<&str> { + splits + .iter() + .map(|split| split.data_files()[0].file_name.as_str()) + .collect() + } + + #[test] + fn test_prune_splits_by_limit_hint_keeps_unknown_merged_row_count() { + let splits = vec![ + limit_test_split("a.parquet", 2), + limit_test_split_with_unknown_merged_row_count("b.parquet", 4), + limit_test_split("c.parquet", 3), + ]; + + let pruned = prune_splits_by_limit_hint(splits, Some(3)); + + assert_eq!( + split_file_names(&pruned), + vec!["a.parquet", "b.parquet", "c.parquet"] + ); + } + + #[test] + fn test_prune_splits_by_limit_hint_stops_after_first_large_split() { + let splits = vec![ + limit_test_split("large.parquet", 10), + limit_test_split("tail.parquet", 2), + ]; + + let pruned = prune_splits_by_limit_hint(splits, Some(3)); + + assert_eq!(split_file_names(&pruned), vec!["large.parquet"]); + } + #[test] fn test_partition_matches_predicate_decode_failure_fails_open() { let predicate = PredicateBuilder::new(&partition_string_field()) From 9ba6eae70fcaf95e3dc9d85d63d417f40a2aecd9 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Tue, 7 Apr 2026 20:40:33 +0800 Subject: [PATCH 2/7] fix(datafusion): tighten fetch contract edge cases --- .../datafusion/src/physical_plan/scan.rs | 28 +++++++++++++++++-- .../integrations/datafusion/src/table/mod.rs | 6 ++-- crates/paimon/src/table/table_scan.rs | 21 ++++++++++---- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 2e2ec1fe..1a678512 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -139,7 +139,7 @@ fn build_exact_fetch_plan( }; } - let tail_start = tail_start.expect("tail_start checked above"); + let tail_start = tail_start.unwrap(); let tail_fetch = remaining; if prefix_splits.is_empty() { @@ -178,6 +178,12 @@ pub(crate) fn build_partition_plan( fetch: Option, ) -> PartitionPlan { if let Some(fetch) = fetch { + if fetch == 0 { + return PartitionPlan { + planned_partitions: vec![Arc::from(Vec::new())], + partition_fetches: vec![Some(0)], + }; + } let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch)); build_exact_fetch_plan(selected_splits, target_partitions, fetch) } else { @@ -362,7 +368,6 @@ impl ExecutionPlan for PaimonTableScan { return None; } - let fetch = fetch.filter(|fetch| *fetch > 0); if self.fetch == fetch { return Some(Arc::new(self.clone())); } @@ -632,9 +637,17 @@ mod tests { .as_any() .downcast_ref::() .expect("with_fetch should return a rebuilt PaimonTableScan"); + let zero_fetch_scan = ExecutionPlan::with_fetch(&exact_scan, Some(0)) + .expect("safe scans should preserve zero fetch pushdown"); + let zero_fetch_scan = zero_fetch_scan + .as_any() + .downcast_ref::() + .expect("with_fetch should return a rebuilt PaimonTableScan"); assert_eq!(ExecutionPlan::fetch(fetched_scan), Some(10)); assert_eq!(fetched_scan.limit_hint(), Some(10)); + assert_eq!(ExecutionPlan::fetch(zero_fetch_scan), Some(0)); + assert_eq!(zero_fetch_scan.limit_hint(), Some(0)); assert!(ExecutionPlan::with_fetch(&disabled_scan, Some(10)).is_none()); } @@ -767,6 +780,17 @@ mod tests { assert_eq!(plan.partition_fetches, vec![Some(2), Some(2), Some(1)]); } + #[test] + fn test_build_partition_plan_preserves_zero_fetch_contract() { + let plan = build_partition_plan(&[counted_split("a.parquet", 2)], 2, None, Some(0)); + + assert_eq!( + plan_file_names(&plan.planned_partitions), + vec![Vec::::new()] + ); + assert_eq!(plan.partition_fetches, vec![Some(0)]); + } + /// Constructs a minimal Table for testing (no real files needed since we /// only test PlanProperties, not actual reads). fn dummy_table() -> Table { diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6d364764..71e48058 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -125,8 +125,10 @@ impl TableProvider for PaimonTableProvider { self.table.schema().fields(), self.table.schema().partition_keys(), ); - let limit_hint = - limit.filter(|limit| *limit > 0 && scan_fetch_mode == ScanFetchMode::Exact); + // Keep the full split plan here so physical with_fetch() can rebuild for + // OFFSET + LIMIT later; narrowing the provider scan too early would lose + // the extra skip budget. + let limit_hint = limit.filter(|_| scan_fetch_mode == ScanFetchMode::Exact); let scan = read_builder.new_scan(); // DataFusion's Python FFI may poll `TableProvider::scan()` without an active // Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem calls while diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index e78d43bb..c20e6c56 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -329,6 +329,9 @@ pub fn prune_splits_by_limit_hint( let Some(limit_hint) = limit_hint else { return splits.into_iter().collect(); }; + if limit_hint == 0 { + return Vec::new(); + } let mut limited_splits = Vec::new(); let mut scanned_row_count: i64 = 0; @@ -419,10 +422,6 @@ impl<'a> TableScan<'a> { } } - fn apply_limit_pushdown(&self, splits: Vec) -> Vec { - prune_splits_by_limit_hint(splits, self.limit) - } - async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { let file_io = self.table.file_io(); let table_path = self.table.location(); @@ -705,7 +704,7 @@ impl<'a> TableScan<'a> { // With data predicates or row_ranges, merged_row_count() reflects pre-filter // row counts, so stopping early could return fewer rows than the limit. let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { - self.apply_limit_pushdown(splits) + prune_splits_by_limit_hint(splits, self.limit) } else { splits }; @@ -914,6 +913,18 @@ mod tests { assert_eq!(split_file_names(&pruned), vec!["large.parquet"]); } + #[test] + fn test_prune_splits_by_limit_hint_zero_returns_empty() { + let splits = vec![ + limit_test_split("a.parquet", 2), + limit_test_split("b.parquet", 3), + ]; + + let pruned = prune_splits_by_limit_hint(splits, Some(0)); + + assert!(pruned.is_empty()); + } + #[test] fn test_partition_matches_predicate_decode_failure_fails_open() { let predicate = PredicateBuilder::new(&partition_string_field()) From 1b625deba5b8fc41982c427edf5ccbc878e18c59 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 8 Apr 2026 11:37:35 +0800 Subject: [PATCH 3/7] Refocus DataFusion scan pruning on core-owned limit hints --- .../datafusion/src/filter_pushdown.rs | 74 ++- .../datafusion/src/physical_plan/scan.rs | 588 ++---------------- .../integrations/datafusion/src/table/mod.rs | 105 +++- .../datafusion/tests/read_tables.rs | 26 +- 4 files changed, 172 insertions(+), 621 deletions(-) diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index b1a1ad93..34c3f04a 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -20,17 +20,6 @@ use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown}; use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; -/// Whether scan-side row-count pruning can safely participate in physical fetch pushdown. -/// -/// `Exact` means every query filter is exact at the table-provider boundary, so -/// scan-side split pruning and physical fetch execution can be enabled without -/// leaving residual filtering above the scan. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) enum ScanFetchMode { - Disabled, - Exact, -} - pub(crate) fn classify_filter_pushdown( filter: &Expr, fields: &[DataField], @@ -69,21 +58,20 @@ pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> } } -pub(crate) fn scan_fetch_mode( +/// Whether it is safe to pass a row-count hint down to paimon-core planning. +/// +/// This stays intentionally narrow: the hint is only safe when there are no +/// filters, or when every filter is exact at the table-provider boundary. +pub(crate) fn can_push_down_limit_hint( filters: &[Expr], fields: &[DataField], partition_keys: &[String], -) -> ScanFetchMode { - if filters.is_empty() +) -> bool { + filters.is_empty() || filters.iter().all(|filter| { classify_filter_pushdown(filter, fields, partition_keys) == TableProviderFilterPushDown::Exact }) - { - ScanFetchMode::Exact - } else { - ScanFetchMode::Disabled - } } fn split_conjunction(expr: &Expr) -> Vec<&Expr> { @@ -421,13 +409,51 @@ mod tests { } #[test] - fn test_scan_fetch_mode_is_exact_without_filters() { + fn test_can_push_down_limit_hint_without_filters() { let fields = test_fields(); - assert_eq!( - scan_fetch_mode(&[], &fields, &partition_keys()), - ScanFetchMode::Exact - ); + assert!(can_push_down_limit_hint(&[], &fields, &partition_keys())); + } + + #[test] + fn test_can_push_down_limit_hint_for_exact_filters() { + let fields = test_fields(); + let filters = vec![ + Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), + Expr::Column(Column::from_name("hr")).eq(lit(10)), + ]; + + assert!(can_push_down_limit_hint( + &filters, + &fields, + &partition_keys() + )); + } + + #[test] + fn test_can_push_down_limit_hint_rejects_inexact_filters() { + let fields = test_fields(); + let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))]; + + assert!(!can_push_down_limit_hint( + &filters, + &fields, + &partition_keys() + )); + } + + #[test] + fn test_can_push_down_limit_hint_rejects_unsupported_filters() { + let fields = test_fields(); + let filters = vec![Expr::Not(Box::new( + Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), + ))]; + + assert!(!can_push_down_limit_hint( + &filters, + &fields, + &partition_keys() + )); } #[test] diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 1a678512..7389a440 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -16,12 +16,9 @@ // under the License. use std::any::Any; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::stats::Precision; use datafusion::common::Statistics; use datafusion::error::Result as DFResult; @@ -29,279 +26,45 @@ use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; -use datafusion::physical_plan::{ - DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, -}; -use futures::{Stream, StreamExt, TryStreamExt}; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; +use futures::{StreamExt, TryStreamExt}; use paimon::spec::Predicate; -use paimon::table::{prune_splits_by_limit_hint, Table}; +use paimon::table::Table; use paimon::DataSplit; use crate::error::to_datafusion_error; -use crate::filter_pushdown::ScanFetchMode; - -#[derive(Debug)] -pub(crate) struct PartitionPlan { - planned_partitions: Vec>, - partition_fetches: Vec>, -} - -#[derive(Debug, Clone)] -pub(crate) struct PaimonTableScanSpec { - pub table: Table, - pub projected_columns: Option>, - pub pushed_predicate: Option, - pub full_splits: Arc<[DataSplit]>, - pub target_partitions: usize, - pub scan_fetch_mode: ScanFetchMode, - pub limit_hint: Option, - pub fetch: Option, -} - -fn round_robin_buckets(items: Vec, num_buckets: usize) -> Vec> { - let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); - for (index, item) in items.into_iter().enumerate() { - buckets[index % num_buckets].push(item); - } - buckets -} - -fn empty_partition_plan() -> PartitionPlan { - PartitionPlan { - planned_partitions: vec![Arc::from(Vec::new())], - partition_fetches: vec![None], - } -} - -fn build_round_robin_plan(splits: Vec, target_partitions: usize) -> PartitionPlan { - if splits.is_empty() { - return empty_partition_plan(); - } - - let num_partitions = splits.len().min(target_partitions.max(1)); - let buckets = round_robin_buckets(splits, num_partitions); - - PartitionPlan { - planned_partitions: buckets.into_iter().map(Arc::from).collect(), - partition_fetches: vec![None; num_partitions], - } -} - -fn build_exact_fetch_plan( - splits: Vec, - target_partitions: usize, - fetch: usize, -) -> PartitionPlan { - if splits.is_empty() { - return empty_partition_plan(); - } - - if target_partitions <= 1 { - return PartitionPlan { - planned_partitions: vec![Arc::from(splits)], - partition_fetches: vec![Some(fetch)], - }; - } - - let mut prefix_splits = Vec::new(); - let mut prefix_rows = Vec::new(); - let mut remaining = fetch; - let mut tail_start = None; - - for (index, split) in splits.iter().enumerate() { - match split.merged_row_count() { - Some(count) if count >= 0 && (count as usize) <= remaining => { - prefix_splits.push(split.clone()); - prefix_rows.push(count as usize); - remaining -= count as usize; - if remaining == 0 { - break; - } - } - _ => { - tail_start = Some(index); - break; - } - } - } - - if tail_start.is_none() { - let num_partitions = prefix_splits.len().min(target_partitions.max(1)); - let split_buckets = round_robin_buckets(prefix_splits, num_partitions); - let row_buckets = round_robin_buckets(prefix_rows, num_partitions); - - return PartitionPlan { - planned_partitions: split_buckets.into_iter().map(Arc::from).collect(), - partition_fetches: row_buckets - .into_iter() - .map(|rows| Some(rows.into_iter().sum())) - .collect(), - }; - } - - let tail_start = tail_start.unwrap(); - let tail_fetch = remaining; - - if prefix_splits.is_empty() { - return PartitionPlan { - planned_partitions: vec![Arc::from(splits)], - partition_fetches: vec![Some(fetch)], - }; - } - - let prefix_partition_count = prefix_splits - .len() - .min(target_partitions.saturating_sub(1).max(1)); - let split_buckets = round_robin_buckets(prefix_splits, prefix_partition_count); - let row_buckets = round_robin_buckets(prefix_rows, prefix_partition_count); - - let mut planned_partitions: Vec> = - split_buckets.into_iter().map(Arc::from).collect(); - let mut partition_fetches: Vec> = row_buckets - .into_iter() - .map(|rows| Some(rows.into_iter().sum())) - .collect(); - - planned_partitions.push(Arc::from(splits[tail_start..].to_vec())); - partition_fetches.push(Some(tail_fetch)); - - PartitionPlan { - planned_partitions, - partition_fetches, - } -} - -pub(crate) fn build_partition_plan( - full_splits: &[DataSplit], - target_partitions: usize, - limit_hint: Option, - fetch: Option, -) -> PartitionPlan { - if let Some(fetch) = fetch { - if fetch == 0 { - return PartitionPlan { - planned_partitions: vec![Arc::from(Vec::new())], - partition_fetches: vec![Some(0)], - }; - } - let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch)); - build_exact_fetch_plan(selected_splits, target_partitions, fetch) - } else { - let selected_splits = prune_splits_by_limit_hint(full_splits.iter().cloned(), limit_hint); - build_round_robin_plan(selected_splits, target_partitions) - } -} - -struct ExactFetchStream { - input: SendableRecordBatchStream, - remaining: usize, -} - -impl ExactFetchStream { - fn new(input: SendableRecordBatchStream, remaining: usize) -> Self { - Self { input, remaining } - } -} - -impl Stream for ExactFetchStream { - type Item = DFResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.remaining == 0 { - return Poll::Ready(None); - } - - let next = futures::ready!(self.input.as_mut().poll_next(cx)); - match next { - Some(Ok(batch)) => { - let batch = if batch.num_rows() > self.remaining { - batch.slice(0, self.remaining) - } else { - batch - }; - self.remaining -= batch.num_rows(); - Poll::Ready(Some(Ok(batch))) - } - Some(Err(error)) => Poll::Ready(Some(Err(error))), - None => Poll::Ready(None), - } - } -} - -impl RecordBatchStream for ExactFetchStream { - fn schema(&self) -> ArrowSchemaRef { - self.input.schema() - } -} /// Execution plan that scans a Paimon table with optional column projection. /// /// Planning is performed eagerly in [`super::super::table::PaimonTableProvider::scan`], /// and the resulting splits are distributed across DataFusion execution partitions /// so that DataFusion can schedule them in parallel. -#[derive(Debug, Clone)] +#[derive(Debug)] pub(crate) struct PaimonTableScan { table: Table, /// Projected column names (if None, reads all columns). projected_columns: Option>, - /// Filter translated from DataFusion expressions and reused during execute(). + /// Filter translated from DataFusion expressions and reused during execute() + /// so reader-side pruning reaches the actual read path. pushed_predicate: Option, - /// Full split plan after filter pushdown, before any row-count hint is applied. - full_splits: Arc<[DataSplit]>, - /// Split assignments per DataFusion partition. + /// Pre-planned partition assignments: `planned_partitions[i]` contains the + /// Paimon splits that DataFusion partition `i` will read. + /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`. planned_partitions: Vec>, - partition_fetches: Vec>, - target_partitions: usize, plan_properties: PlanProperties, - scan_fetch_mode: ScanFetchMode, - limit_hint: Option, - fetch: Option, + /// Optional limit hint pushed to paimon-core planning. + limit: Option, } impl PaimonTableScan { - pub(crate) fn new(schema: ArrowSchemaRef, spec: PaimonTableScanSpec) -> Self { - let PartitionPlan { - planned_partitions, - partition_fetches, - } = build_partition_plan( - &spec.full_splits, - spec.target_partitions, - spec.limit_hint, - spec.fetch, - ); - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(schema.clone()), - Partitioning::UnknownPartitioning(planned_partitions.len()), - EmissionType::Incremental, - Boundedness::Bounded, - ); - Self { - table: spec.table, - projected_columns: spec.projected_columns, - pushed_predicate: spec.pushed_predicate, - full_splits: spec.full_splits, - planned_partitions, - partition_fetches, - target_partitions: spec.target_partitions, - plan_properties, - scan_fetch_mode: spec.scan_fetch_mode, - limit_hint: spec.limit_hint, - fetch: spec.fetch, - } - } - - #[cfg(test)] - pub(crate) fn new_with_planned_partitions( + pub(crate) fn new( schema: ArrowSchemaRef, - spec: PaimonTableScanSpec, + table: Table, + projected_columns: Option>, + pushed_predicate: Option, planned_partitions: Vec>, - partition_fetches: Vec>, + limit: Option, ) -> Self { - let full_splits = planned_partitions - .iter() - .flat_map(|partition| partition.iter().cloned()) - .collect::>(); - let target_partitions = planned_partitions.len().max(1); let plan_properties = PlanProperties::new( EquivalenceProperties::new(schema.clone()), Partitioning::UnknownPartitioning(planned_partitions.len()), @@ -309,17 +72,12 @@ impl PaimonTableScan { Boundedness::Bounded, ); Self { - table: spec.table, - projected_columns: spec.projected_columns, - pushed_predicate: spec.pushed_predicate, - full_splits: Arc::from(full_splits), + table, + projected_columns, + pushed_predicate, planned_partitions, - partition_fetches, - target_partitions, plan_properties, - scan_fetch_mode: spec.scan_fetch_mode, - limit_hint: spec.limit_hint, - fetch: spec.fetch, + limit, } } @@ -334,8 +92,8 @@ impl PaimonTableScan { } #[cfg(test)] - pub(crate) fn limit_hint(&self) -> Option { - self.limit_hint + pub(crate) fn limit(&self) -> Option { + self.limit } } @@ -363,34 +121,6 @@ impl ExecutionPlan for PaimonTableScan { Ok(self) } - fn with_fetch(&self, fetch: Option) -> Option> { - if self.scan_fetch_mode != ScanFetchMode::Exact { - return None; - } - - if self.fetch == fetch { - return Some(Arc::new(self.clone())); - } - - Some(Arc::new(Self::new( - self.schema(), - PaimonTableScanSpec { - table: self.table.clone(), - projected_columns: self.projected_columns.clone(), - pushed_predicate: self.pushed_predicate.clone(), - full_splits: Arc::clone(&self.full_splits), - target_partitions: self.target_partitions, - scan_fetch_mode: self.scan_fetch_mode, - limit_hint: fetch, - fetch, - }, - ))) - } - - fn fetch(&self) -> Option { - self.fetch - } - fn execute( &self, partition: usize, @@ -402,12 +132,6 @@ impl ExecutionPlan for PaimonTableScan { self.planned_partitions.len() )) })?); - let partition_fetch = *self.partition_fetches.get(partition).ok_or_else(|| { - datafusion::error::DataFusionError::Internal(format!( - "PaimonTableScan: partition fetch index {partition} out of range (total {})", - self.partition_fetches.len() - )) - })?; let table = self.table.clone(); let schema = self.schema(); @@ -435,16 +159,10 @@ impl ExecutionPlan for PaimonTableScan { )) }; - let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(fut).try_flatten(), - )); - - if let Some(fetch) = partition_fetch { - Ok(Box::pin(ExactFetchStream::new(stream, fetch))) - } else { - Ok(stream) - } + ))) } fn statistics(&self) -> DFResult { @@ -467,13 +185,6 @@ impl ExecutionPlan for PaimonTableScan { } } } - if let Some(fetch) = - partition.and_then(|idx| self.partition_fetches.get(idx).copied().flatten()) - { - total_rows = total_rows.min(fetch); - } else if let Some(fetch) = self.fetch { - total_rows = total_rows.min(fetch); - } Ok(Statistics { num_rows: Precision::Inexact(total_rows), @@ -510,11 +221,8 @@ impl DisplayAs for PaimonTableScan { if let Some(ref predicate) = self.pushed_predicate { write!(f, ", predicate={predicate}")?; } - if let Some(limit_hint) = self.limit_hint { - write!(f, ", limit_hint={limit_hint}")?; - } - if let Some(fetch) = self.fetch { - write!(f, ", fetch={fetch}")?; + if let Some(limit) = self.limit { + write!(f, ", limit={limit}")?; } Ok(()) } @@ -537,7 +245,6 @@ mod tests { use paimon::spec::{ BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema, }; - use paimon::DeletionFile; use std::fs; use tempfile::tempdir; use test_utils::{local_file_path, test_data_file, write_int_parquet_file}; @@ -553,20 +260,13 @@ mod tests { #[test] fn test_partition_count_empty_plan() { let schema = test_schema(); - let scan = PaimonTableScan::new_with_planned_partitions( + let scan = PaimonTableScan::new( schema, - PaimonTableScanSpec { - table: dummy_table(), - projected_columns: None, - pushed_predicate: None, - full_splits: Arc::from(Vec::new()), - target_partitions: 1, - scan_fetch_mode: ScanFetchMode::Disabled, - limit_hint: None, - fetch: None, - }, + dummy_table(), + None, + None, vec![Arc::from(Vec::new())], - vec![None], + None, ); assert_eq!(scan.properties().output_partitioning().partition_count(), 1); } @@ -579,218 +279,11 @@ mod tests { Arc::from(Vec::new()), Arc::from(Vec::new()), ]; - let scan = PaimonTableScan::new_with_planned_partitions( - schema, - PaimonTableScanSpec { - table: dummy_table(), - projected_columns: None, - pushed_predicate: None, - full_splits: Arc::from(Vec::new()), - target_partitions: 3, - scan_fetch_mode: ScanFetchMode::Disabled, - limit_hint: None, - fetch: None, - }, - planned_partitions, - vec![None, None, None], - ); + let scan = + PaimonTableScan::new(schema, dummy_table(), None, None, planned_partitions, None); assert_eq!(scan.properties().output_partitioning().partition_count(), 3); } - #[test] - fn test_scan_exposes_fetch_capability_only_when_safe() { - let schema = test_schema(); - let exact_scan = PaimonTableScan::new_with_planned_partitions( - schema, - PaimonTableScanSpec { - table: dummy_table(), - projected_columns: None, - pushed_predicate: None, - full_splits: Arc::from(Vec::new()), - target_partitions: 1, - scan_fetch_mode: ScanFetchMode::Exact, - limit_hint: Some(10), - fetch: None, - }, - vec![Arc::from(Vec::new())], - vec![None], - ); - let disabled_scan = PaimonTableScan::new_with_planned_partitions( - test_schema(), - PaimonTableScanSpec { - table: dummy_table(), - projected_columns: None, - pushed_predicate: None, - full_splits: Arc::from(Vec::new()), - target_partitions: 1, - scan_fetch_mode: ScanFetchMode::Disabled, - limit_hint: Some(10), - fetch: None, - }, - vec![Arc::from(Vec::new())], - vec![None], - ); - - let fetched_scan = ExecutionPlan::with_fetch(&exact_scan, Some(10)) - .expect("safe scans should accept physical fetch pushdown"); - let fetched_scan = fetched_scan - .as_any() - .downcast_ref::() - .expect("with_fetch should return a rebuilt PaimonTableScan"); - let zero_fetch_scan = ExecutionPlan::with_fetch(&exact_scan, Some(0)) - .expect("safe scans should preserve zero fetch pushdown"); - let zero_fetch_scan = zero_fetch_scan - .as_any() - .downcast_ref::() - .expect("with_fetch should return a rebuilt PaimonTableScan"); - - assert_eq!(ExecutionPlan::fetch(fetched_scan), Some(10)); - assert_eq!(fetched_scan.limit_hint(), Some(10)); - assert_eq!(ExecutionPlan::fetch(zero_fetch_scan), Some(0)); - assert_eq!(zero_fetch_scan.limit_hint(), Some(0)); - assert!(ExecutionPlan::with_fetch(&disabled_scan, Some(10)).is_none()); - } - - fn plan_file_names(partitions: &[Arc<[DataSplit]>]) -> Vec> { - partitions - .iter() - .map(|partition| { - partition - .iter() - .map(|split| split.data_files()[0].file_name.clone()) - .collect() - }) - .collect() - } - - fn counted_split(file_name: &str, row_count: i64) -> DataSplit { - paimon::DataSplitBuilder::new() - .with_snapshot(1) - .with_partition(BinaryRow::new(0)) - .with_bucket(0) - .with_bucket_path(format!("file:/tmp/{file_name}")) - .with_total_buckets(1) - .with_data_files(vec![test_data_file(file_name, row_count)]) - .with_raw_convertible(true) - .build() - .unwrap() - } - - fn unknown_count_split(file_name: &str, row_count: i64) -> DataSplit { - paimon::DataSplitBuilder::new() - .with_snapshot(1) - .with_partition(BinaryRow::new(0)) - .with_bucket(0) - .with_bucket_path(format!("file:/tmp/{file_name}")) - .with_total_buckets(1) - .with_data_files(vec![test_data_file(file_name, row_count)]) - .with_data_deletion_files(vec![Some(DeletionFile::new( - format!("file:/tmp/{file_name}.dv"), - 0, - 0, - None, - ))]) - .with_raw_convertible(true) - .build() - .unwrap() - } - - #[test] - fn test_build_partition_plan_assigns_unknown_merged_row_count_to_tail_fetch_partition() { - let full_splits = vec![ - counted_split("a.parquet", 2), - unknown_count_split("b.parquet", 4), - counted_split("c.parquet", 3), - ]; - - let plan = build_partition_plan(&full_splits, 3, None, Some(3)); - - assert_eq!( - plan_file_names(&plan.planned_partitions), - vec![ - vec!["a.parquet".to_string()], - vec!["b.parquet".to_string(), "c.parquet".to_string()], - ] - ); - assert_eq!(plan.partition_fetches, vec![Some(2), Some(1)]); - } - - #[test] - fn test_build_exact_fetch_plan_keeps_large_first_split_in_single_partition() { - let plan = build_exact_fetch_plan( - vec![ - counted_split("large.parquet", 10), - counted_split("tail.parquet", 2), - ], - 4, - 3, - ); - - assert_eq!( - plan_file_names(&plan.planned_partitions), - vec![vec![ - "large.parquet".to_string(), - "tail.parquet".to_string() - ]] - ); - assert_eq!(plan.partition_fetches, vec![Some(3)]); - } - - #[test] - fn test_build_exact_fetch_plan_preserves_zero_row_prefix_budget() { - let plan = build_exact_fetch_plan( - vec![ - counted_split("zero.parquet", 0), - counted_split("tail.parquet", 5), - ], - 2, - 3, - ); - - assert_eq!( - plan_file_names(&plan.planned_partitions), - vec![ - vec!["zero.parquet".to_string()], - vec!["tail.parquet".to_string()], - ] - ); - assert_eq!(plan.partition_fetches, vec![Some(0), Some(3)]); - } - - #[test] - fn test_build_exact_fetch_plan_splits_prefix_and_tail_fetches_across_partitions() { - let plan = build_exact_fetch_plan( - vec![ - counted_split("a.parquet", 2), - counted_split("b.parquet", 2), - counted_split("c.parquet", 10), - ], - 3, - 5, - ); - - assert_eq!( - plan_file_names(&plan.planned_partitions), - vec![ - vec!["a.parquet".to_string()], - vec!["b.parquet".to_string()], - vec!["c.parquet".to_string()], - ] - ); - assert_eq!(plan.partition_fetches, vec![Some(2), Some(2), Some(1)]); - } - - #[test] - fn test_build_partition_plan_preserves_zero_fetch_contract() { - let plan = build_partition_plan(&[counted_split("a.parquet", 2)], 2, None, Some(0)); - - assert_eq!( - plan_file_names(&plan.planned_partitions), - vec![Vec::::new()] - ); - assert_eq!(plan.partition_fetches, vec![Some(0)]); - } - /// Constructs a minimal Table for testing (no real files needed since we /// only test PlanProperties, not actual reads). fn dummy_table() -> Table { @@ -856,16 +349,11 @@ mod tests { )])); let scan = PaimonTableScan::new( schema, - PaimonTableScanSpec { - table, - projected_columns: Some(vec!["id".to_string()]), - pushed_predicate: Some(pushed_predicate), - full_splits: Arc::from(vec![split]), - target_partitions: 1, - scan_fetch_mode: ScanFetchMode::Disabled, - limit_hint: None, - fetch: None, - }, + table, + Some(vec!["id".to_string()]), + Some(pushed_predicate), + vec![Arc::from(vec![split])], + None, ); let ctx = SessionContext::new(); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 71e48058..5b4ae794 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -31,9 +31,8 @@ use paimon::table::Table; use crate::error::to_datafusion_error; use crate::filter_pushdown::{ - build_pushed_predicate, classify_filter_pushdown, scan_fetch_mode, ScanFetchMode, + build_pushed_predicate, can_push_down_limit_hint, classify_filter_pushdown, }; -use crate::physical_plan::scan::PaimonTableScanSpec; use crate::physical_plan::PaimonTableScan; use crate::runtime::await_with_runtime; @@ -76,6 +75,15 @@ impl PaimonTableProvider { } } +/// Distribute `items` into `num_buckets` groups using round-robin assignment. +fn bucket_round_robin(items: Vec, num_buckets: usize) -> Vec> { + let mut buckets: Vec> = (0..num_buckets).map(|_| Vec::new()).collect(); + for (index, item) in items.into_iter().enumerate() { + buckets[index % num_buckets].push(item); + } + buckets +} + #[async_trait] impl TableProvider for PaimonTableProvider { fn as_any(&self) -> &dyn Any { @@ -120,15 +128,16 @@ impl TableProvider for PaimonTableProvider { if let Some(filter) = pushed_predicate.clone() { read_builder.with_filter(filter); } - let scan_fetch_mode = scan_fetch_mode( - filters, - self.table.schema().fields(), - self.table.schema().partition_keys(), - ); - // Keep the full split plan here so physical with_fetch() can rebuild for - // OFFSET + LIMIT later; narrowing the provider scan too early would lose - // the extra skip budget. - let limit_hint = limit.filter(|_| scan_fetch_mode == ScanFetchMode::Exact); + let pushed_limit = limit.filter(|_| { + can_push_down_limit_hint( + filters, + self.table.schema().fields(), + self.table.schema().partition_keys(), + ) + }); + if let Some(limit) = pushed_limit { + read_builder.with_limit(limit); + } let scan = read_builder.new_scan(); // DataFusion's Python FFI may poll `TableProvider::scan()` without an active // Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem calls while @@ -138,21 +147,29 @@ impl TableProvider for PaimonTableProvider { .await .map_err(to_datafusion_error)?; - let target_partitions = state.config_options().execution.target_partitions.max(1); - let full_splits = Arc::from(plan.splits().to_vec()); + // Distribute splits across DataFusion partitions, capped by the + // session's target_partitions to avoid over-sharding with many small splits. + // Each partition's splits are wrapped in Arc to avoid deep-cloning in execute(). + let splits = plan.splits().to_vec(); + let planned_partitions: Vec> = if splits.is_empty() { + // Empty plans get a single empty partition to avoid 0-partition edge cases. + vec![Arc::from(Vec::new())] + } else { + let target = state.config_options().execution.target_partitions; + let num_partitions = splits.len().min(target.max(1)); + bucket_round_robin(splits, num_partitions) + .into_iter() + .map(Arc::from) + .collect() + }; Ok(Arc::new(PaimonTableScan::new( projected_schema, - PaimonTableScanSpec { - table: self.table.clone(), - projected_columns, - pushed_predicate, - full_splits, - target_partitions, - scan_fetch_mode, - limit_hint, - fetch: None, - }, + self.table.clone(), + projected_columns, + pushed_predicate, + planned_partitions, + pushed_limit, ))) } @@ -178,13 +195,30 @@ mod tests { use datafusion::datasource::TableProvider; use datafusion::logical_expr::{col, lit, Expr}; - use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{SessionConfig, SessionContext}; use paimon::catalog::Identifier; use paimon::{Catalog, CatalogOptions, DataSplit, FileSystemCatalog, Options}; use crate::physical_plan::PaimonTableScan; + #[test] + fn test_bucket_round_robin_distributes_evenly() { + let result = bucket_round_robin(vec![0, 1, 2, 3, 4], 3); + assert_eq!(result, vec![vec![0, 3], vec![1, 4], vec![2]]); + } + + #[test] + fn test_bucket_round_robin_fewer_items_than_buckets() { + let result = bucket_round_robin(vec![10, 20], 2); + assert_eq!(result, vec![vec![10], vec![20]]); + } + + #[test] + fn test_bucket_round_robin_single_bucket() { + let result = bucket_round_robin(vec![1, 2, 3], 1); + assert_eq!(result, vec![vec![1, 2, 3]]); + } + fn get_test_warehouse() -> String { std::env::var("PAIMON_TEST_WAREHOUSE") .unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) @@ -350,20 +384,13 @@ mod tests { async fn test_scan_applies_limit_hint_only_when_safe() { let provider = create_provider("partitioned_log_table").await; let full_plan = plan_partitions(&provider, vec![], None).await; - - let config = SessionConfig::new().with_target_partitions(8); - let ctx = SessionContext::new_with_config(config); - let state = ctx.state(); - let plan = provider - .scan(&state, None, &[], Some(1)) - .await - .expect("scan() should succeed"); + let plan = plan_scan(&provider, vec![], Some(1)).await; let scan = plan .as_any() .downcast_ref::() .expect("Expected PaimonTableScan"); - assert_eq!(scan.limit_hint(), Some(1)); + assert_eq!(scan.limit(), Some(1)); assert!( scan.planned_partitions() .iter() @@ -375,4 +402,16 @@ mod tests { .sum::() ); } + + #[tokio::test] + async fn test_scan_skips_limit_hint_for_inexact_filters() { + let provider = create_provider("partitioned_log_table").await; + let plan = plan_scan(&provider, vec![col("id").gt(lit(1))], Some(1)).await; + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit(), None); + } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index f4b60106..232e083a 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -352,7 +352,7 @@ async fn test_limit_pushdown_on_data_evolution_table_returns_merged_rows() { } #[tokio::test] -async fn test_limit_pushdown_applies_safe_scan_fetch_contract_and_keeps_correctness() { +async fn test_limit_pushdown_marks_safe_scan_limit_hint_and_keeps_correctness() { let sql = "SELECT id, name FROM simple_log_table LIMIT 2"; let plan = create_physical_plan("simple_log_table", sql) .await @@ -363,8 +363,8 @@ async fn test_limit_pushdown_applies_safe_scan_fetch_contract_and_keeps_correctn assert!( scan_lines .iter() - .any(|line| line.contains("limit_hint=2") && line.contains("fetch=2")), - "Safe LIMIT query should push DataFusion physical fetch into PaimonTableScan, plan:\n{plan_text}" + .any(|line| line.contains("limit=2") && !line.contains("fetch=")), + "Safe LIMIT query should push a scan limit hint into PaimonTableScan, plan:\n{plan_text}" ); let batches = collect_query("simple_log_table", sql) @@ -375,7 +375,7 @@ async fn test_limit_pushdown_applies_safe_scan_fetch_contract_and_keeps_correctn } #[tokio::test] -async fn test_offset_limit_pushdown_applies_scan_fetch_contract_and_keeps_correctness() { +async fn test_offset_limit_pushdown_keeps_correctness_without_fetch_contract() { let sql = "SELECT id, name FROM partitioned_log_table OFFSET 1 LIMIT 1"; let plan = create_physical_plan("partitioned_log_table", sql) .await @@ -388,10 +388,8 @@ async fn test_offset_limit_pushdown_applies_scan_fetch_contract_and_keeps_correc "OFFSET queries should keep a GlobalLimitExec in DataFusion, plan:\n{plan_text}" ); assert!( - scan_lines - .iter() - .any(|line| line.contains("limit_hint=2") && line.contains("fetch=2")), - "OFFSET + LIMIT should rebuild the scan with fetch=skip+fetch, plan:\n{plan_text}" + scan_lines.iter().all(|line| !line.contains("fetch=")), + "OFFSET + LIMIT should not rely on the removed DataFusion fetch contract in PaimonTableScan, plan:\n{plan_text}" ); let batches = collect_query("partitioned_log_table", sql) @@ -406,7 +404,7 @@ async fn test_offset_limit_pushdown_applies_scan_fetch_contract_and_keeps_correc } #[tokio::test] -async fn test_inexact_filter_limit_disables_scan_limit_hint_and_keeps_correctness() { +async fn test_inexact_filter_limit_skips_scan_limit_hint_and_keeps_correctness() { let sql = "SELECT id, name FROM partitioned_log_table WHERE id > 1 LIMIT 1"; let plan = create_physical_plan("partitioned_log_table", sql) .await @@ -421,8 +419,8 @@ async fn test_inexact_filter_limit_disables_scan_limit_hint_and_keeps_correctnes assert!( scan_lines .iter() - .all(|line| !line.contains("limit_hint=") && !line.contains("fetch=")), - "Inexact filter queries should fail open without scan fetch pushdown, plan:\n{plan_text}" + .all(|line| !line.contains("limit=") && !line.contains("fetch=")), + "Inexact filter queries should fail open without scan limit pushdown, plan:\n{plan_text}" ); let batches = collect_query("partitioned_log_table", sql) @@ -436,7 +434,7 @@ async fn test_inexact_filter_limit_disables_scan_limit_hint_and_keeps_correctnes } #[tokio::test] -async fn test_residual_filter_limit_disables_scan_limit_hint_and_keeps_correctness() { +async fn test_residual_filter_limit_skips_scan_limit_hint_and_keeps_correctness() { let sql = "SELECT id, name FROM simple_log_table WHERE id + 1 > 3 LIMIT 1"; let plan = create_physical_plan("simple_log_table", sql) .await @@ -451,8 +449,8 @@ async fn test_residual_filter_limit_disables_scan_limit_hint_and_keeps_correctne assert!( scan_lines .iter() - .all(|line| !line.contains("limit_hint=") && !line.contains("fetch=")), - "Residual filter queries should fail open without scan fetch pushdown, plan:\n{plan_text}" + .all(|line| !line.contains("limit=") && !line.contains("fetch=")), + "Residual filter queries should fail open without scan limit pushdown, plan:\n{plan_text}" ); let batches = collect_query("simple_log_table", sql) From cd3af4cd58be1dec1378431d671c96a201bacc1e Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 8 Apr 2026 13:24:39 +0800 Subject: [PATCH 4/7] Keep TableScan limit pushdown internal and preserve limit semantics --- crates/paimon/src/table/mod.rs | 2 +- crates/paimon/src/table/table_scan.rs | 150 ++++++++++++++------------ 2 files changed, 82 insertions(+), 70 deletions(-) diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 6c158976..afcefc0b 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -36,7 +36,7 @@ pub use snapshot_manager::SnapshotManager; pub use source::{ merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, }; -pub use table_scan::{prune_splits_by_limit_hint, TableScan}; +pub use table_scan::TableScan; pub use tag_manager::TagManager; use crate::catalog::Identifier; diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index c20e6c56..5c269d03 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -310,48 +310,6 @@ pub struct TableScan<'a> { row_ranges: Option>, } -/// Apply a limit-pushdown hint to the generated splits. -/// -/// Iterates through splits and accumulates `merged_row_count()` until the -/// limit hint is reached. Returns only the splits likely needed to satisfy -/// that hint. -/// -/// This does not guarantee an exact final row count. If a split's -/// `merged_row_count()` is `None` (for example because of unknown deletion -/// cardinality), that split is kept even though its contribution to the -/// limit is unknown. Planning may still stop early later if the -/// accumulated known `merged_row_count()` reaches the limit, and the -/// caller or query engine must enforce the final LIMIT. -pub fn prune_splits_by_limit_hint( - splits: impl IntoIterator, - limit_hint: Option, -) -> Vec { - let Some(limit_hint) = limit_hint else { - return splits.into_iter().collect(); - }; - if limit_hint == 0 { - return Vec::new(); - } - - let mut limited_splits = Vec::new(); - let mut scanned_row_count: i64 = 0; - - for split in splits { - match split.merged_row_count() { - Some(merged_count) => { - limited_splits.push(split); - scanned_row_count += merged_count; - if scanned_row_count >= limit_hint as i64 { - return limited_splits; - } - } - None => limited_splits.push(split), - } - } - - limited_splits -} - impl<'a> TableScan<'a> { pub fn new( table: &'a Table, @@ -422,6 +380,52 @@ impl<'a> TableScan<'a> { } } + /// Apply a limit-pushdown hint to the generated splits. + /// + /// Iterates through splits and accumulates `merged_row_count()` until the + /// limit hint is reached. Returns only the splits likely needed to satisfy + /// that hint. + /// + /// This does not guarantee an exact final row count. If a split's + /// `merged_row_count()` is `None` (for example because of unknown deletion + /// cardinality), that split is kept even though its contribution to the + /// limit is unknown. Planning may still stop early later if the + /// accumulated known `merged_row_count()` reaches the limit, and the + /// caller or query engine must enforce the final LIMIT. + fn apply_limit_pushdown(&self, splits: Vec) -> Vec { + let limit = match self.limit { + Some(l) => l, + None => return splits, + }; + if limit == 0 { + return Vec::new(); + } + + if splits.is_empty() { + return splits; + } + + let mut limited_splits = Vec::new(); + let mut scanned_row_count: i64 = 0; + + for split in splits { + match split.merged_row_count() { + Some(merged_count) => { + limited_splits.push(split); + scanned_row_count += merged_count; + if scanned_row_count >= limit as i64 { + return limited_splits; + } + } + None => { + limited_splits.push(split); + } + } + } + + limited_splits + } + async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { let file_io = self.table.file_io(); let table_path = self.table.location(); @@ -704,7 +708,7 @@ impl<'a> TableScan<'a> { // With data predicates or row_ranges, merged_row_count() reflects pre-filter // row counts, so stopping early could return fewer rows than the limit. let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { - prune_splits_by_limit_hint(splits, self.limit) + self.apply_limit_pushdown(splits) } else { splits }; @@ -715,15 +719,19 @@ impl<'a> TableScan<'a> { #[cfg(test)] mod tests { - use super::{partition_matches_predicate, prune_splits_by_limit_hint}; + use super::{partition_matches_predicate, TableScan}; + use crate::catalog::Identifier; + use crate::io::FileIOBuilder; use crate::spec::{ stats::BinaryTableStats, ArrayType, BinaryRow, BinaryRowBuilder, DataField, DataFileMeta, DataType, Datum, DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, IntType, - Predicate, PredicateBuilder, PredicateOperator, VarCharType, + Predicate, PredicateBuilder, PredicateOperator, Schema as PaimonSchema, TableSchema, + VarCharType, }; use crate::table::bucket_filter::{compute_target_buckets, extract_predicate_for_keys}; use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile}; use crate::table::stats_filter::{data_file_matches_predicates, group_by_overlapping_row_id}; + use crate::table::Table; use crate::Error; use chrono::{DateTime, Utc}; @@ -837,6 +845,18 @@ mod tests { } } + fn limit_test_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let schema = PaimonSchema::builder().build().unwrap(); + let table_schema = TableSchema::new(0, &schema); + Table::new( + file_io, + Identifier::new("test_db", "test_table"), + "/tmp/test-table".to_string(), + table_schema, + ) + } + fn limit_test_split(file_name: &str, row_count: i64) -> DataSplit { let mut file = test_data_file_meta(Vec::new(), Vec::new(), Vec::new(), row_count); file.file_name = file_name.to_string(); @@ -886,43 +906,35 @@ mod tests { } #[test] - fn test_prune_splits_by_limit_hint_keeps_unknown_merged_row_count() { + fn test_apply_limit_pushdown_zero_returns_empty() { + let table = limit_test_table(); + let scan = TableScan::new(&table, None, vec![], None, Some(0), None); let splits = vec![ limit_test_split("a.parquet", 2), - limit_test_split_with_unknown_merged_row_count("b.parquet", 4), - limit_test_split("c.parquet", 3), - ]; - - let pruned = prune_splits_by_limit_hint(splits, Some(3)); - - assert_eq!( - split_file_names(&pruned), - vec!["a.parquet", "b.parquet", "c.parquet"] - ); - } - - #[test] - fn test_prune_splits_by_limit_hint_stops_after_first_large_split() { - let splits = vec![ - limit_test_split("large.parquet", 10), - limit_test_split("tail.parquet", 2), + limit_test_split("b.parquet", 3), ]; - let pruned = prune_splits_by_limit_hint(splits, Some(3)); + let pruned = scan.apply_limit_pushdown(splits); - assert_eq!(split_file_names(&pruned), vec!["large.parquet"]); + assert!(pruned.is_empty()); } #[test] - fn test_prune_splits_by_limit_hint_zero_returns_empty() { + fn test_apply_limit_pushdown_keeps_unknown_merged_row_count() { + let table = limit_test_table(); + let scan = TableScan::new(&table, None, vec![], None, Some(3), None); let splits = vec![ limit_test_split("a.parquet", 2), - limit_test_split("b.parquet", 3), + limit_test_split_with_unknown_merged_row_count("b.parquet", 4), + limit_test_split("c.parquet", 3), ]; - let pruned = prune_splits_by_limit_hint(splits, Some(0)); + let pruned = scan.apply_limit_pushdown(splits); - assert!(pruned.is_empty()); + assert_eq!( + split_file_names(&pruned), + vec!["a.parquet", "b.parquet", "c.parquet"] + ); } #[test] From c8e9e18ab61e6671f905df65a3549cabcd58a4a0 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 8 Apr 2026 17:00:12 +0800 Subject: [PATCH 5/7] Refine limit hint ownership for DataFusion scans --- .../datafusion/src/filter_pushdown.rs | 145 +++++++++++------- .../integrations/datafusion/src/table/mod.rs | 22 +-- crates/paimon/src/table/read_builder.rs | 62 +++++++- crates/paimon/src/table/table_scan.rs | 18 ++- 4 files changed, 181 insertions(+), 66 deletions(-) diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 34c3f04a..4cb3e12f 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -20,58 +20,89 @@ use datafusion::logical_expr::expr::InList; use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown}; use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; -pub(crate) fn classify_filter_pushdown( +#[derive(Debug)] +struct SingleFilterPushdown { + pushdown: TableProviderFilterPushDown, + translated_predicates: Vec, +} + +#[derive(Debug)] +pub(crate) struct FilterPushdownAnalysis { + pub(crate) pushed_predicate: Option, + pub(crate) has_residual_filters: bool, +} + +fn analyze_filter( filter: &Expr, fields: &[DataField], partition_keys: &[String], -) -> TableProviderFilterPushDown { +) -> SingleFilterPushdown { let translator = FilterTranslator::new(fields); - if translator.translate(filter).is_some() { + if let Some(predicate) = translator.translate(filter) { let partition_translator = FilterTranslator::for_allowed_columns(fields, partition_keys); - if partition_translator.translate(filter).is_some() { + let pushdown = if partition_translator.translate(filter).is_some() { TableProviderFilterPushDown::Exact } else { TableProviderFilterPushDown::Inexact - } - } else if split_conjunction(filter) + }; + return SingleFilterPushdown { + pushdown, + translated_predicates: vec![predicate], + }; + } + + let translated_predicates: Vec<_> = split_conjunction(filter) .into_iter() - .any(|expr| translator.translate(expr).is_some()) - { - TableProviderFilterPushDown::Inexact - } else { + .filter_map(|expr| translator.translate(expr)) + .collect(); + let pushdown = if translated_predicates.is_empty() { TableProviderFilterPushDown::Unsupported + } else { + TableProviderFilterPushDown::Inexact + }; + + SingleFilterPushdown { + pushdown, + translated_predicates, } } -pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option { - let translator = FilterTranslator::new(fields); - let pushed: Vec<_> = filters - .iter() - .flat_map(split_conjunction) - .filter_map(|filter| translator.translate(filter)) - .collect(); +pub(crate) fn analyze_filters( + filters: &[Expr], + fields: &[DataField], + partition_keys: &[String], +) -> FilterPushdownAnalysis { + let mut translated_predicates = Vec::new(); + let mut has_residual_filters = false; - if pushed.is_empty() { - None - } else { - Some(Predicate::and(pushed)) + for filter in filters { + let analysis = analyze_filter(filter, fields, partition_keys); + has_residual_filters |= analysis.pushdown != TableProviderFilterPushDown::Exact; + translated_predicates.extend(analysis.translated_predicates); + } + + FilterPushdownAnalysis { + pushed_predicate: if translated_predicates.is_empty() { + None + } else { + Some(Predicate::and(translated_predicates)) + }, + has_residual_filters, } } -/// Whether it is safe to pass a row-count hint down to paimon-core planning. -/// -/// This stays intentionally narrow: the hint is only safe when there are no -/// filters, or when every filter is exact at the table-provider boundary. -pub(crate) fn can_push_down_limit_hint( - filters: &[Expr], +pub(crate) fn classify_filter_pushdown( + filter: &Expr, fields: &[DataField], partition_keys: &[String], -) -> bool { - filters.is_empty() - || filters.iter().all(|filter| { - classify_filter_pushdown(filter, fields, partition_keys) - == TableProviderFilterPushDown::Exact - }) +) -> TableProviderFilterPushDown { + analyze_filter(filter, fields, partition_keys).pushdown +} + +#[cfg(test)] +pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option { + let no_partition_keys: &[String] = &[]; + analyze_filters(filters, fields, no_partition_keys).pushed_predicate } fn split_conjunction(expr: &Expr) -> Vec<&Expr> { @@ -409,51 +440,59 @@ mod tests { } #[test] - fn test_can_push_down_limit_hint_without_filters() { + fn test_analyze_filters_without_filters_has_no_residuals() { let fields = test_fields(); + let analysis = analyze_filters(&[], &fields, &partition_keys()); - assert!(can_push_down_limit_hint(&[], &fields, &partition_keys())); + assert!(analysis.pushed_predicate.is_none()); + assert!(!analysis.has_residual_filters); } #[test] - fn test_can_push_down_limit_hint_for_exact_filters() { + fn test_analyze_filters_for_exact_filters_has_no_residuals() { let fields = test_fields(); let filters = vec![ Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), Expr::Column(Column::from_name("hr")).eq(lit(10)), ]; + let analysis = analyze_filters(&filters, &fields, &partition_keys()); - assert!(can_push_down_limit_hint( - &filters, - &fields, - &partition_keys() - )); + assert_eq!( + analysis + .pushed_predicate + .expect("exact filters should translate") + .to_string(), + "(dt = '2024-01-01' AND hr = 10)" + ); + assert!(!analysis.has_residual_filters); } #[test] - fn test_can_push_down_limit_hint_rejects_inexact_filters() { + fn test_analyze_filters_marks_inexact_filters_as_residual() { let fields = test_fields(); let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))]; + let analysis = analyze_filters(&filters, &fields, &partition_keys()); - assert!(!can_push_down_limit_hint( - &filters, - &fields, - &partition_keys() - )); + assert_eq!( + analysis + .pushed_predicate + .expect("inexact filters should still translate pushed conjuncts") + .to_string(), + "id > 10" + ); + assert!(analysis.has_residual_filters); } #[test] - fn test_can_push_down_limit_hint_rejects_unsupported_filters() { + fn test_analyze_filters_marks_unsupported_filters_as_residual() { let fields = test_fields(); let filters = vec![Expr::Not(Box::new( Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), ))]; + let analysis = analyze_filters(&filters, &fields, &partition_keys()); - assert!(!can_push_down_limit_hint( - &filters, - &fields, - &partition_keys() - )); + assert!(analysis.pushed_predicate.is_none()); + assert!(analysis.has_residual_filters); } #[test] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 5b4ae794..f31c712e 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -30,9 +30,9 @@ use datafusion::physical_plan::ExecutionPlan; use paimon::table::Table; use crate::error::to_datafusion_error; -use crate::filter_pushdown::{ - build_pushed_predicate, can_push_down_limit_hint, classify_filter_pushdown, -}; +#[cfg(test)] +use crate::filter_pushdown::build_pushed_predicate; +use crate::filter_pushdown::{analyze_filters, classify_filter_pushdown}; use crate::physical_plan::PaimonTableScan; use crate::runtime::await_with_runtime; @@ -123,17 +123,17 @@ impl TableProvider for PaimonTableProvider { }; // Plan splits eagerly so we know partition count upfront. - let pushed_predicate = build_pushed_predicate(filters, self.table.schema().fields()); + let filter_analysis = analyze_filters( + filters, + self.table.schema().fields(), + self.table.schema().partition_keys(), + ); let mut read_builder = self.table.new_read_builder(); - if let Some(filter) = pushed_predicate.clone() { + if let Some(filter) = filter_analysis.pushed_predicate.clone() { read_builder.with_filter(filter); } let pushed_limit = limit.filter(|_| { - can_push_down_limit_hint( - filters, - self.table.schema().fields(), - self.table.schema().partition_keys(), - ) + !filter_analysis.has_residual_filters && read_builder.can_push_down_limit_hint() }); if let Some(limit) = pushed_limit { read_builder.with_limit(limit); @@ -167,7 +167,7 @@ impl TableProvider for PaimonTableProvider { projected_schema, self.table.clone(), projected_columns, - pushed_predicate, + filter_analysis.pushed_predicate, planned_partitions, pushed_limit, ))) diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index c9be32cd..9c5868d8 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -21,6 +21,7 @@ //! and [TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java). use super::bucket_filter::{extract_predicate_for_keys, split_partition_and_data_predicates}; +use super::table_scan::can_push_down_limit_hint_for_scan; use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::filtering::reader_pruning_predicates; use crate::arrow::ArrowReaderBuilder; @@ -166,11 +167,22 @@ impl<'a> ReadBuilder<'a> { /// Note: This method does not guarantee that exactly `limit` rows will be /// returned by [`TableRead`]. It is only a pushdown hint for planning. /// Callers or query engines are responsible for enforcing the final LIMIT. + /// + /// Query adapters that may keep residual filtering above paimon-core + /// should only call this after checking [`Self::can_push_down_limit_hint`]. pub fn with_limit(&mut self, limit: usize) -> &mut Self { self.limit = Some(limit); self } + /// Whether the current scan state can safely consume a row-count hint. + /// + /// This only covers paimon-core's own scan semantics. Query adapters must + /// still account for any residual filtering they keep above paimon-core. + pub fn can_push_down_limit_hint(&self) -> bool { + can_push_down_limit_hint_for_scan(&self.filter.data_predicates, self.row_ranges.as_deref()) + } + /// Create a table scan. Call [TableScan::plan] to get splits. pub fn new_scan(&self) -> TableScan<'a> { TableScan::new( @@ -336,7 +348,7 @@ mod tests { use crate::spec::{ BinaryRow, DataType, IntType, Predicate, PredicateBuilder, Schema, TableSchema, VarCharType, }; - use crate::table::{DataSplitBuilder, Table}; + use crate::table::{DataSplitBuilder, RowRange, Table}; use arrow_array::{Int32Array, RecordBatch}; use futures::TryStreamExt; use std::fs; @@ -357,6 +369,54 @@ mod tests { .collect() } + fn simple_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("dt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["dt"]) + .build() + .unwrap(), + ); + Table::new( + file_io, + Identifier::new("default", "t"), + "/tmp/test-read-builder".to_string(), + table_schema, + ) + } + + #[test] + fn test_limit_hint_is_safe_for_partition_only_filter() { + let table = simple_table(); + let predicate = PredicateBuilder::new(table.schema().fields()) + .equal("dt", crate::spec::Datum::String("2024-01-01".to_string())) + .unwrap(); + + let mut builder = table.new_read_builder(); + builder.with_filter(predicate); + + assert!(builder.can_push_down_limit_hint()); + } + + #[test] + fn test_limit_hint_is_not_safe_for_data_filter_or_row_ranges() { + let table = simple_table(); + let predicate = PredicateBuilder::new(table.schema().fields()) + .greater_than("id", crate::spec::Datum::Int(1)) + .unwrap(); + + let mut builder = table.new_read_builder(); + builder.with_filter(predicate); + assert!(!builder.can_push_down_limit_hint()); + + let mut builder = table.new_read_builder(); + builder.with_row_ranges(vec![RowRange::new(1, 10)]); + assert!(!builder.can_push_down_limit_hint()); + } + #[tokio::test] async fn test_new_read_pushes_filter_to_reader_when_filter_column_not_projected() { let tempdir = tempdir().unwrap(); diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 5c269d03..e1b0b19e 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -295,6 +295,18 @@ fn partition_matches_predicate( } } +/// Whether scan-owned pruning still preserves `merged_row_count()` as a safe +/// row-count hint. +/// +/// Data predicates and row ranges can reduce rows within a split after planning, +/// so split-level row counts stop being a conservative bound for final rows. +pub(super) fn can_push_down_limit_hint_for_scan( + data_predicates: &[Predicate], + row_ranges: Option<&[RowRange]>, +) -> bool { + data_predicates.is_empty() && row_ranges.is_none() +} + /// TableScan for full table scan (no incremental, no predicate). /// /// Reference: [pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py) @@ -426,6 +438,10 @@ impl<'a> TableScan<'a> { limited_splits } + fn can_push_down_limit_hint(&self) -> bool { + can_push_down_limit_hint_for_scan(&self.data_predicates, self.row_ranges.as_deref()) + } + async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { let file_io = self.table.file_io(); let table_path = self.table.location(); @@ -707,7 +723,7 @@ impl<'a> TableScan<'a> { // With data predicates or row_ranges, merged_row_count() reflects pre-filter // row counts, so stopping early could return fewer rows than the limit. - let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { + let splits = if self.can_push_down_limit_hint() { self.apply_limit_pushdown(splits) } else { splits From be3f8ecb2920c942c128772933fd8b790a6da3df Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 8 Apr 2026 20:04:09 +0800 Subject: [PATCH 6/7] Preserve limit hint correctness with residual filters --- .../datafusion/src/filter_pushdown.rs | 202 ++++++++---------- .../integrations/datafusion/src/table/mod.rs | 69 ++++-- .../datafusion/tests/read_tables.rs | 62 +++++- crates/paimon/src/table/read_builder.rs | 66 +++--- 4 files changed, 245 insertions(+), 154 deletions(-) diff --git a/crates/integrations/datafusion/src/filter_pushdown.rs b/crates/integrations/datafusion/src/filter_pushdown.rs index 4cb3e12f..55250a5f 100644 --- a/crates/integrations/datafusion/src/filter_pushdown.rs +++ b/crates/integrations/datafusion/src/filter_pushdown.rs @@ -21,64 +21,43 @@ use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProvide use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder}; #[derive(Debug)] -struct SingleFilterPushdown { - pushdown: TableProviderFilterPushDown, +struct SingleFilterAnalysis { translated_predicates: Vec, + has_untranslated_residual: bool, } #[derive(Debug)] pub(crate) struct FilterPushdownAnalysis { pub(crate) pushed_predicate: Option, - pub(crate) has_residual_filters: bool, + pub(crate) has_untranslated_residual: bool, } -fn analyze_filter( - filter: &Expr, - fields: &[DataField], - partition_keys: &[String], -) -> SingleFilterPushdown { +fn analyze_filter(filter: &Expr, fields: &[DataField]) -> SingleFilterAnalysis { let translator = FilterTranslator::new(fields); if let Some(predicate) = translator.translate(filter) { - let partition_translator = FilterTranslator::for_allowed_columns(fields, partition_keys); - let pushdown = if partition_translator.translate(filter).is_some() { - TableProviderFilterPushDown::Exact - } else { - TableProviderFilterPushDown::Inexact - }; - return SingleFilterPushdown { - pushdown, + return SingleFilterAnalysis { translated_predicates: vec![predicate], + has_untranslated_residual: false, }; } - let translated_predicates: Vec<_> = split_conjunction(filter) - .into_iter() - .filter_map(|expr| translator.translate(expr)) - .collect(); - let pushdown = if translated_predicates.is_empty() { - TableProviderFilterPushDown::Unsupported - } else { - TableProviderFilterPushDown::Inexact - }; - - SingleFilterPushdown { - pushdown, - translated_predicates, + SingleFilterAnalysis { + translated_predicates: split_conjunction(filter) + .into_iter() + .filter_map(|expr| translator.translate(expr)) + .collect(), + has_untranslated_residual: true, } } -pub(crate) fn analyze_filters( - filters: &[Expr], - fields: &[DataField], - partition_keys: &[String], -) -> FilterPushdownAnalysis { +pub(crate) fn analyze_filters(filters: &[Expr], fields: &[DataField]) -> FilterPushdownAnalysis { let mut translated_predicates = Vec::new(); - let mut has_residual_filters = false; + let mut has_untranslated_residual = false; for filter in filters { - let analysis = analyze_filter(filter, fields, partition_keys); - has_residual_filters |= analysis.pushdown != TableProviderFilterPushDown::Exact; + let analysis = analyze_filter(filter, fields); translated_predicates.extend(analysis.translated_predicates); + has_untranslated_residual |= analysis.has_untranslated_residual; } FilterPushdownAnalysis { @@ -87,22 +66,38 @@ pub(crate) fn analyze_filters( } else { Some(Predicate::and(translated_predicates)) }, - has_residual_filters, + has_untranslated_residual, } } -pub(crate) fn classify_filter_pushdown( - filter: &Expr, - fields: &[DataField], - partition_keys: &[String], -) -> TableProviderFilterPushDown { - analyze_filter(filter, fields, partition_keys).pushdown -} - #[cfg(test)] pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option { - let no_partition_keys: &[String] = &[]; - analyze_filters(filters, fields, no_partition_keys).pushed_predicate + analyze_filters(filters, fields).pushed_predicate +} + +pub(crate) fn classify_filter_pushdown( + filter: &Expr, + fields: &[DataField], + is_exact_filter_pushdown: F, +) -> TableProviderFilterPushDown +where + F: Fn(&Predicate) -> bool, +{ + let translator = FilterTranslator::new(fields); + if let Some(predicate) = translator.translate(filter) { + if is_exact_filter_pushdown(&predicate) { + TableProviderFilterPushDown::Exact + } else { + TableProviderFilterPushDown::Inexact + } + } else if split_conjunction(filter) + .into_iter() + .any(|expr| translator.translate(expr).is_some()) + { + TableProviderFilterPushDown::Inexact + } else { + TableProviderFilterPushDown::Unsupported + } } fn split_conjunction(expr: &Expr) -> Vec<&Expr> { @@ -122,7 +117,6 @@ fn split_conjunction(expr: &Expr) -> Vec<&Expr> { struct FilterTranslator<'a> { fields: &'a [DataField], - allowed_columns: Option<&'a [String]>, predicate_builder: PredicateBuilder, } @@ -130,15 +124,6 @@ impl<'a> FilterTranslator<'a> { fn new(fields: &'a [DataField]) -> Self { Self { fields, - allowed_columns: None, - predicate_builder: PredicateBuilder::new(fields), - } - } - - fn for_allowed_columns(fields: &'a [DataField], allowed_columns: &'a [String]) -> Self { - Self { - fields, - allowed_columns: Some(allowed_columns), predicate_builder: PredicateBuilder::new(fields), } } @@ -287,12 +272,6 @@ impl<'a> FilterTranslator<'a> { return None; }; - if let Some(allowed_columns) = self.allowed_columns { - if !allowed_columns.iter().any(|column| column == name) { - return None; - } - } - self.fields.iter().find(|field| field.name() == name) } } @@ -399,22 +378,39 @@ mod tests { use super::*; use datafusion::common::Column; use datafusion::logical_expr::{expr::InList, lit, TableProviderFilterPushDown}; - use paimon::spec::{IntType, VarCharType}; + use paimon::catalog::Identifier; + use paimon::io::FileIOBuilder; + use paimon::spec::{IntType, Schema, TableSchema, VarCharType}; + use paimon::table::Table; + + fn test_table() -> Table { + let file_io = FileIOBuilder::new("file").build().unwrap(); + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("dt", DataType::VarChar(VarCharType::string_type())) + .column("hr", DataType::Int(IntType::new())) + .partition_keys(["dt", "hr"]) + .build() + .unwrap(), + ); + Table::new( + file_io, + Identifier::new("default", "t"), + "/tmp/test-filter-pushdown".to_string(), + table_schema, + ) + } fn test_fields() -> Vec { - vec![ - DataField::new(0, "id".to_string(), DataType::Int(IntType::new())), - DataField::new( - 1, - "dt".to_string(), - DataType::VarChar(VarCharType::string_type()), - ), - DataField::new(2, "hr".to_string(), DataType::Int(IntType::new())), - ] + test_table().schema().fields().to_vec() } - fn partition_keys() -> Vec { - vec!["dt".to_string(), "hr".to_string()] + fn is_exact_filter_pushdown(predicate: &Predicate) -> bool { + test_table() + .new_read_builder() + .is_exact_filter_pushdown(predicate) } #[test] @@ -434,65 +430,57 @@ mod tests { let filter = Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Exact ); } #[test] - fn test_analyze_filters_without_filters_has_no_residuals() { - let fields = test_fields(); - let analysis = analyze_filters(&[], &fields, &partition_keys()); - - assert!(analysis.pushed_predicate.is_none()); - assert!(!analysis.has_residual_filters); - } - - #[test] - fn test_analyze_filters_for_exact_filters_has_no_residuals() { + fn test_analyze_filters_for_supported_data_filter_has_no_untranslated_residual() { let fields = test_fields(); - let filters = vec![ - Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), - Expr::Column(Column::from_name("hr")).eq(lit(10)), - ]; - let analysis = analyze_filters(&filters, &fields, &partition_keys()); + let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))]; + let analysis = analyze_filters(&filters, &fields); assert_eq!( analysis .pushed_predicate - .expect("exact filters should translate") + .expect("data filter should translate") .to_string(), - "(dt = '2024-01-01' AND hr = 10)" + "id > 10" ); - assert!(!analysis.has_residual_filters); + assert!(!analysis.has_untranslated_residual); } #[test] - fn test_analyze_filters_marks_inexact_filters_as_residual() { + fn test_analyze_filters_marks_partial_translation_as_untranslated_residual() { let fields = test_fields(); - let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))]; - let analysis = analyze_filters(&filters, &fields, &partition_keys()); + let filters = vec![Expr::Column(Column::from_name("dt")) + .eq(lit("2024-01-01")) + .and(Expr::Not(Box::new( + Expr::Column(Column::from_name("hr")).eq(lit(10)), + )))]; + let analysis = analyze_filters(&filters, &fields); assert_eq!( analysis .pushed_predicate - .expect("inexact filters should still translate pushed conjuncts") + .expect("supported conjunct should still translate") .to_string(), - "id > 10" + "dt = '2024-01-01'" ); - assert!(analysis.has_residual_filters); + assert!(analysis.has_untranslated_residual); } #[test] - fn test_analyze_filters_marks_unsupported_filters_as_residual() { + fn test_analyze_filters_marks_unsupported_filter_as_untranslated_residual() { let fields = test_fields(); let filters = vec![Expr::Not(Box::new( Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")), ))]; - let analysis = analyze_filters(&filters, &fields, &partition_keys()); + let analysis = analyze_filters(&filters, &fields); assert!(analysis.pushed_predicate.is_none()); - assert!(analysis.has_residual_filters); + assert!(analysis.has_untranslated_residual); } #[test] @@ -551,7 +539,7 @@ mod tests { let filter = Expr::Column(Column::from_name("id")).gt(lit(10)); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Inexact ); } @@ -577,7 +565,7 @@ mod tests { .and(Expr::Column(Column::from_name("id")).gt(lit(10))); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Inexact ); } @@ -603,7 +591,7 @@ mod tests { )); assert_eq!( - classify_filter_pushdown(&filter, &fields, &partition_keys()), + classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown), TableProviderFilterPushDown::Unsupported ); } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index f31c712e..f24f3321 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -123,18 +123,12 @@ impl TableProvider for PaimonTableProvider { }; // Plan splits eagerly so we know partition count upfront. - let filter_analysis = analyze_filters( - filters, - self.table.schema().fields(), - self.table.schema().partition_keys(), - ); + let filter_analysis = analyze_filters(filters, self.table.schema().fields()); let mut read_builder = self.table.new_read_builder(); if let Some(filter) = filter_analysis.pushed_predicate.clone() { read_builder.with_filter(filter); } - let pushed_limit = limit.filter(|_| { - !filter_analysis.has_residual_filters && read_builder.can_push_down_limit_hint() - }); + let pushed_limit = limit.filter(|_| !filter_analysis.has_untranslated_residual); if let Some(limit) = pushed_limit { read_builder.with_limit(limit); } @@ -178,11 +172,15 @@ impl TableProvider for PaimonTableProvider { filters: &[&Expr], ) -> DFResult> { let fields = self.table.schema().fields(); - let partition_keys = self.table.schema().partition_keys(); + let read_builder = self.table.new_read_builder(); Ok(filters .iter() - .map(|filter| classify_filter_pushdown(filter, fields, partition_keys)) + .map(|filter| { + classify_filter_pushdown(filter, fields, |predicate| { + read_builder.is_exact_filter_pushdown(predicate) + }) + }) .collect()) } } @@ -357,6 +355,39 @@ mod tests { ); } + #[tokio::test] + async fn test_scan_partially_translated_filter_keeps_partition_pruning_but_skips_limit_hint() { + let provider = create_provider("multi_partitioned_log_table").await; + let filter = col("dt") + .eq(lit("2024-01-01")) + .and(Expr::Not(Box::new(col("hr").eq(lit(10))))); + let full_plan = plan_partitions(&provider, vec![filter.clone()], None).await; + let plan = plan_scan(&provider, vec![filter], Some(1)).await; + let scan = plan + .as_any() + .downcast_ref::() + .expect("Expected PaimonTableScan"); + + assert_eq!(scan.limit(), None); + assert_eq!( + extract_dt_hr_partition_set(scan.planned_partitions()), + BTreeSet::from([ + ("2024-01-01".to_string(), 10), + ("2024-01-01".to_string(), 20), + ]), + ); + assert_eq!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::(), + full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); + } + #[tokio::test] async fn test_scan_keeps_pushed_predicate_for_execute() { let provider = create_provider("partitioned_log_table").await; @@ -404,14 +435,26 @@ mod tests { } #[tokio::test] - async fn test_scan_skips_limit_hint_for_inexact_filters() { + async fn test_scan_keeps_limit_but_skips_limit_pruning_for_data_filters() { let provider = create_provider("partitioned_log_table").await; - let plan = plan_scan(&provider, vec![col("id").gt(lit(1))], Some(1)).await; + let filter = col("id").gt(lit(1)); + let full_plan = plan_partitions(&provider, vec![filter.clone()], None).await; + let plan = plan_scan(&provider, vec![filter], Some(1)).await; let scan = plan .as_any() .downcast_ref::() .expect("Expected PaimonTableScan"); - assert_eq!(scan.limit(), None); + assert_eq!(scan.limit(), Some(1)); + assert_eq!( + scan.planned_partitions() + .iter() + .map(|partition| partition.len()) + .sum::(), + full_plan + .iter() + .map(|partition| partition.len()) + .sum::() + ); } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 232e083a..ffdd3f5e 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -322,6 +322,46 @@ async fn test_mixed_and_filter_keeps_residual_datafusion_filter() { assert_eq!(actual_rows, vec![(2, "bob".to_string())]); } +#[tokio::test] +async fn test_partially_translated_filter_keeps_partition_pruning_but_skips_scan_limit_hint() { + let sql = "SELECT id, name FROM multi_partitioned_log_table WHERE dt = '2024-01-01' AND NOT (hr = 10) LIMIT 1"; + let plan = create_physical_plan("multi_partitioned_log_table", sql) + .await + .expect("Physical plan creation should succeed"); + let plan_text = format_physical_plan(&plan); + let scan_lines = paimon_scan_lines(&plan_text); + + assert!( + !scan_lines.is_empty(), + "plan should contain a PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines + .iter() + .any(|line| line.contains("predicate=dt = '2024-01-01'")), + "The translated partition predicate should still be pushed into PaimonTableScan, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Partially translated filters should not revive the removed fetch contract, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().all(|line| !line.contains("limit=")), + "Partially translated filters should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}" + ); + + let batches = collect_query("multi_partitioned_log_table", sql) + .await + .expect("Partially translated filter + LIMIT query should succeed"); + let rows = extract_id_name_rows(&batches); + + assert_eq!( + rows, + vec![(3, "carol".to_string())], + "The residual filter should still be enforced above the scan" + ); +} + #[tokio::test] async fn test_limit_pushdown_on_data_evolution_table_returns_merged_rows() { let batches = collect_query( @@ -404,7 +444,7 @@ async fn test_offset_limit_pushdown_keeps_correctness_without_fetch_contract() { } #[tokio::test] -async fn test_inexact_filter_limit_skips_scan_limit_hint_and_keeps_correctness() { +async fn test_inexact_filter_limit_keeps_connector_limit_and_correctness() { let sql = "SELECT id, name FROM partitioned_log_table WHERE id > 1 LIMIT 1"; let plan = create_physical_plan("partitioned_log_table", sql) .await @@ -417,10 +457,12 @@ async fn test_inexact_filter_limit_skips_scan_limit_hint_and_keeps_correctness() "plan should contain a PaimonTableScan, plan:\n{plan_text}" ); assert!( - scan_lines - .iter() - .all(|line| !line.contains("limit=") && !line.contains("fetch=")), - "Inexact filter queries should fail open without scan limit pushdown, plan:\n{plan_text}" + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Inexact filter queries should not revive the removed fetch contract, plan:\n{plan_text}" + ); + assert!( + scan_lines.iter().any(|line| line.contains("limit=1")), + "Inexact filter queries should keep the query limit on PaimonTableScan, plan:\n{plan_text}" ); let batches = collect_query("partitioned_log_table", sql) @@ -434,7 +476,7 @@ async fn test_inexact_filter_limit_skips_scan_limit_hint_and_keeps_correctness() } #[tokio::test] -async fn test_residual_filter_limit_skips_scan_limit_hint_and_keeps_correctness() { +async fn test_residual_filter_limit_keeps_connector_limit_and_correctness() { let sql = "SELECT id, name FROM simple_log_table WHERE id + 1 > 3 LIMIT 1"; let plan = create_physical_plan("simple_log_table", sql) .await @@ -446,11 +488,15 @@ async fn test_residual_filter_limit_skips_scan_limit_hint_and_keeps_correctness( !scan_lines.is_empty(), "plan should contain a PaimonTableScan, plan:\n{plan_text}" ); + assert!( + scan_lines.iter().all(|line| !line.contains("fetch=")), + "Residual filter queries should not revive the removed fetch contract, plan:\n{plan_text}" + ); assert!( scan_lines .iter() - .all(|line| !line.contains("limit=") && !line.contains("fetch=")), - "Residual filter queries should fail open without scan limit pushdown, plan:\n{plan_text}" + .all(|line| !line.contains("limit=")), + "Residual filter queries should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}" ); let batches = collect_query("simple_log_table", sql) diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 9c5868d8..81fd7131 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -21,7 +21,6 @@ //! and [TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java). use super::bucket_filter::{extract_predicate_for_keys, split_partition_and_data_predicates}; -use super::table_scan::can_push_down_limit_hint_for_scan; use super::{ArrowRecordBatchStream, Table, TableScan}; use crate::arrow::filtering::reader_pruning_predicates; use crate::arrow::ArrowReaderBuilder; @@ -38,6 +37,25 @@ struct NormalizedFilter { bucket_predicate: Option, } +/// Whether a translated predicate is exact at the table-provider boundary. +/// +/// Exact filters are fully enforced by paimon-core scan planning using only +/// partition-owned semantics, without requiring residual filtering above the +/// scan. +fn is_exact_filter_pushdown_for_schema( + fields: &[DataField], + partition_keys: &[String], + filter: &Predicate, +) -> bool { + if partition_keys.is_empty() { + return false; + } + + let (_, data_predicates) = + split_partition_and_data_predicates(filter.clone(), fields, partition_keys); + data_predicates.is_empty() +} + fn split_scan_predicates(table: &Table, filter: Predicate) -> (Option, Vec) { let partition_keys = table.schema().partition_keys(); if partition_keys.is_empty() { @@ -149,6 +167,18 @@ impl<'a> ReadBuilder<'a> { self } + /// Whether a translated predicate is exact at the table-provider boundary. + /// + /// Exact filters are fully enforced by paimon-core scan planning, without + /// requiring residual filtering above the scan. + pub fn is_exact_filter_pushdown(&self, filter: &Predicate) -> bool { + is_exact_filter_pushdown_for_schema( + self.table.schema().fields(), + self.table.schema().partition_keys(), + filter, + ) + } + /// Set row ID ranges `[from, to]` (inclusive) for filtering in data evolution mode. pub fn with_row_ranges(&mut self, ranges: Vec) -> &mut Self { self.row_ranges = if ranges.is_empty() { @@ -161,28 +191,17 @@ impl<'a> ReadBuilder<'a> { /// Push a row-limit hint down to scan planning. /// - /// This allows the scan to generate fewer splits when possible. The hint is - /// applied based on the `merged_row_count()` of each split. + /// This allows paimon-core scan planning to generate fewer splits when the + /// current scan state keeps split-level `merged_row_count()` conservative. /// /// Note: This method does not guarantee that exactly `limit` rows will be /// returned by [`TableRead`]. It is only a pushdown hint for planning. /// Callers or query engines are responsible for enforcing the final LIMIT. - /// - /// Query adapters that may keep residual filtering above paimon-core - /// should only call this after checking [`Self::can_push_down_limit_hint`]. pub fn with_limit(&mut self, limit: usize) -> &mut Self { self.limit = Some(limit); self } - /// Whether the current scan state can safely consume a row-count hint. - /// - /// This only covers paimon-core's own scan semantics. Query adapters must - /// still account for any residual filtering they keep above paimon-core. - pub fn can_push_down_limit_hint(&self) -> bool { - can_push_down_limit_hint_for_scan(&self.filter.data_predicates, self.row_ranges.as_deref()) - } - /// Create a table scan. Call [TableScan::plan] to get splits. pub fn new_scan(&self) -> TableScan<'a> { TableScan::new( @@ -348,7 +367,7 @@ mod tests { use crate::spec::{ BinaryRow, DataType, IntType, Predicate, PredicateBuilder, Schema, TableSchema, VarCharType, }; - use crate::table::{DataSplitBuilder, RowRange, Table}; + use crate::table::{DataSplitBuilder, Table}; use arrow_array::{Int32Array, RecordBatch}; use futures::TryStreamExt; use std::fs; @@ -389,32 +408,27 @@ mod tests { } #[test] - fn test_limit_hint_is_safe_for_partition_only_filter() { + fn test_exact_filter_pushdown_is_true_for_partition_only_filter() { let table = simple_table(); let predicate = PredicateBuilder::new(table.schema().fields()) .equal("dt", crate::spec::Datum::String("2024-01-01".to_string())) .unwrap(); - let mut builder = table.new_read_builder(); - builder.with_filter(predicate); + let builder = table.new_read_builder(); - assert!(builder.can_push_down_limit_hint()); + assert!(builder.is_exact_filter_pushdown(&predicate)); } #[test] - fn test_limit_hint_is_not_safe_for_data_filter_or_row_ranges() { + fn test_exact_filter_pushdown_is_false_for_data_filter() { let table = simple_table(); let predicate = PredicateBuilder::new(table.schema().fields()) .greater_than("id", crate::spec::Datum::Int(1)) .unwrap(); - let mut builder = table.new_read_builder(); - builder.with_filter(predicate); - assert!(!builder.can_push_down_limit_hint()); + let builder = table.new_read_builder(); - let mut builder = table.new_read_builder(); - builder.with_row_ranges(vec![RowRange::new(1, 10)]); - assert!(!builder.can_push_down_limit_hint()); + assert!(!builder.is_exact_filter_pushdown(&predicate)); } #[tokio::test] From 2266127843d264c84eb2b91905ae40b1364d8e99 Mon Sep 17 00:00:00 2001 From: QuakeWang <1677980708@qq.com> Date: Wed, 8 Apr 2026 20:24:01 +0800 Subject: [PATCH 7/7] Align DataFusion integration tests with actual plan shape --- .../integrations/datafusion/tests/read_tables.rs | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index ffdd3f5e..b51264be 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -323,8 +323,8 @@ async fn test_mixed_and_filter_keeps_residual_datafusion_filter() { } #[tokio::test] -async fn test_partially_translated_filter_keeps_partition_pruning_but_skips_scan_limit_hint() { - let sql = "SELECT id, name FROM multi_partitioned_log_table WHERE dt = '2024-01-01' AND NOT (hr = 10) LIMIT 1"; +async fn test_partially_translated_filter_keeps_partition_pruning_and_correctness() { + let sql = "SELECT id, name FROM multi_partitioned_log_table WHERE dt = '2024-01-01' AND hr + 1 > 20 LIMIT 1"; let plan = create_physical_plan("multi_partitioned_log_table", sql) .await .expect("Physical plan creation should succeed"); @@ -345,10 +345,6 @@ async fn test_partially_translated_filter_keeps_partition_pruning_but_skips_scan scan_lines.iter().all(|line| !line.contains("fetch=")), "Partially translated filters should not revive the removed fetch contract, plan:\n{plan_text}" ); - assert!( - scan_lines.iter().all(|line| !line.contains("limit=")), - "Partially translated filters should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}" - ); let batches = collect_query("multi_partitioned_log_table", sql) .await @@ -444,7 +440,7 @@ async fn test_offset_limit_pushdown_keeps_correctness_without_fetch_contract() { } #[tokio::test] -async fn test_inexact_filter_limit_keeps_connector_limit_and_correctness() { +async fn test_inexact_filter_limit_keeps_correctness_without_fetch_contract() { let sql = "SELECT id, name FROM partitioned_log_table WHERE id > 1 LIMIT 1"; let plan = create_physical_plan("partitioned_log_table", sql) .await @@ -460,10 +456,6 @@ async fn test_inexact_filter_limit_keeps_connector_limit_and_correctness() { scan_lines.iter().all(|line| !line.contains("fetch=")), "Inexact filter queries should not revive the removed fetch contract, plan:\n{plan_text}" ); - assert!( - scan_lines.iter().any(|line| line.contains("limit=1")), - "Inexact filter queries should keep the query limit on PaimonTableScan, plan:\n{plan_text}" - ); let batches = collect_query("partitioned_log_table", sql) .await