Skip to content
197 changes: 144 additions & 53 deletions crates/integrations/datafusion/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,72 @@ use datafusion::logical_expr::expr::InList;
use datafusion::logical_expr::{Between, BinaryExpr, Expr, Operator, TableProviderFilterPushDown};
use paimon::spec::{DataField, DataType, Datum, Predicate, PredicateBuilder};

pub(crate) fn classify_filter_pushdown(
#[derive(Debug)]
struct SingleFilterAnalysis {
translated_predicates: Vec<Predicate>,
has_untranslated_residual: bool,
}

#[derive(Debug)]
pub(crate) struct FilterPushdownAnalysis {
pub(crate) pushed_predicate: Option<Predicate>,
pub(crate) has_untranslated_residual: bool,
}

fn analyze_filter(filter: &Expr, fields: &[DataField]) -> SingleFilterAnalysis {
let translator = FilterTranslator::new(fields);
if let Some(predicate) = translator.translate(filter) {
return SingleFilterAnalysis {
translated_predicates: vec![predicate],
has_untranslated_residual: false,
};
}

SingleFilterAnalysis {
translated_predicates: split_conjunction(filter)
.into_iter()
.filter_map(|expr| translator.translate(expr))
.collect(),
has_untranslated_residual: true,
}
}

pub(crate) fn analyze_filters(filters: &[Expr], fields: &[DataField]) -> FilterPushdownAnalysis {
let mut translated_predicates = Vec::new();
let mut has_untranslated_residual = false;

for filter in filters {
let analysis = analyze_filter(filter, fields);
translated_predicates.extend(analysis.translated_predicates);
has_untranslated_residual |= analysis.has_untranslated_residual;
}

FilterPushdownAnalysis {
pushed_predicate: if translated_predicates.is_empty() {
None
} else {
Some(Predicate::and(translated_predicates))
},
has_untranslated_residual,
}
}

#[cfg(test)]
pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option<Predicate> {
analyze_filters(filters, fields).pushed_predicate
}

pub(crate) fn classify_filter_pushdown<F>(
filter: &Expr,
fields: &[DataField],
partition_keys: &[String],
) -> TableProviderFilterPushDown {
is_exact_filter_pushdown: F,
) -> TableProviderFilterPushDown
where
F: Fn(&Predicate) -> bool,
{
let translator = FilterTranslator::new(fields);
if translator.translate(filter).is_some() {
let partition_translator = FilterTranslator::for_allowed_columns(fields, partition_keys);
if partition_translator.translate(filter).is_some() {
if let Some(predicate) = translator.translate(filter) {
if is_exact_filter_pushdown(&predicate) {
TableProviderFilterPushDown::Exact
} else {
TableProviderFilterPushDown::Inexact
Expand All @@ -43,21 +100,6 @@ pub(crate) fn classify_filter_pushdown(
}
}

pub(crate) fn build_pushed_predicate(filters: &[Expr], fields: &[DataField]) -> Option<Predicate> {
let translator = FilterTranslator::new(fields);
let pushed: Vec<_> = filters
.iter()
.flat_map(split_conjunction)
.filter_map(|filter| translator.translate(filter))
.collect();

if pushed.is_empty() {
None
} else {
Some(Predicate::and(pushed))
}
}

fn split_conjunction(expr: &Expr) -> Vec<&Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr {
Expand All @@ -75,23 +117,13 @@ fn split_conjunction(expr: &Expr) -> Vec<&Expr> {

struct FilterTranslator<'a> {
fields: &'a [DataField],
allowed_columns: Option<&'a [String]>,
predicate_builder: PredicateBuilder,
}

impl<'a> FilterTranslator<'a> {
fn new(fields: &'a [DataField]) -> Self {
Self {
fields,
allowed_columns: None,
predicate_builder: PredicateBuilder::new(fields),
}
}

fn for_allowed_columns(fields: &'a [DataField], allowed_columns: &'a [String]) -> Self {
Self {
fields,
allowed_columns: Some(allowed_columns),
predicate_builder: PredicateBuilder::new(fields),
}
}
Expand Down Expand Up @@ -240,12 +272,6 @@ impl<'a> FilterTranslator<'a> {
return None;
};

if let Some(allowed_columns) = self.allowed_columns {
if !allowed_columns.iter().any(|column| column == name) {
return None;
}
}

self.fields.iter().find(|field| field.name() == name)
}
}
Expand Down Expand Up @@ -352,22 +378,39 @@ mod tests {
use super::*;
use datafusion::common::Column;
use datafusion::logical_expr::{expr::InList, lit, TableProviderFilterPushDown};
use paimon::spec::{IntType, VarCharType};
use paimon::catalog::Identifier;
use paimon::io::FileIOBuilder;
use paimon::spec::{IntType, Schema, TableSchema, VarCharType};
use paimon::table::Table;

fn test_table() -> Table {
let file_io = FileIOBuilder::new("file").build().unwrap();
let table_schema = TableSchema::new(
0,
&Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("dt", DataType::VarChar(VarCharType::string_type()))
.column("hr", DataType::Int(IntType::new()))
.partition_keys(["dt", "hr"])
.build()
.unwrap(),
);
Table::new(
file_io,
Identifier::new("default", "t"),
"/tmp/test-filter-pushdown".to_string(),
table_schema,
)
}

fn test_fields() -> Vec<DataField> {
vec![
DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
DataField::new(
1,
"dt".to_string(),
DataType::VarChar(VarCharType::string_type()),
),
DataField::new(2, "hr".to_string(), DataType::Int(IntType::new())),
]
test_table().schema().fields().to_vec()
}

fn partition_keys() -> Vec<String> {
vec!["dt".to_string(), "hr".to_string()]
fn is_exact_filter_pushdown(predicate: &Predicate) -> bool {
test_table()
.new_read_builder()
.is_exact_filter_pushdown(predicate)
}

#[test]
Expand All @@ -387,11 +430,59 @@ mod tests {
let filter = Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01"));

assert_eq!(
classify_filter_pushdown(&filter, &fields, &partition_keys()),
classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown),
TableProviderFilterPushDown::Exact
);
}

#[test]
fn test_analyze_filters_for_supported_data_filter_has_no_untranslated_residual() {
let fields = test_fields();
let filters = vec![Expr::Column(Column::from_name("id")).gt(lit(10))];
let analysis = analyze_filters(&filters, &fields);

assert_eq!(
analysis
.pushed_predicate
.expect("data filter should translate")
.to_string(),
"id > 10"
);
assert!(!analysis.has_untranslated_residual);
}

#[test]
fn test_analyze_filters_marks_partial_translation_as_untranslated_residual() {
let fields = test_fields();
let filters = vec![Expr::Column(Column::from_name("dt"))
.eq(lit("2024-01-01"))
.and(Expr::Not(Box::new(
Expr::Column(Column::from_name("hr")).eq(lit(10)),
)))];
let analysis = analyze_filters(&filters, &fields);

assert_eq!(
analysis
.pushed_predicate
.expect("supported conjunct should still translate")
.to_string(),
"dt = '2024-01-01'"
);
assert!(analysis.has_untranslated_residual);
}

#[test]
fn test_analyze_filters_marks_unsupported_filter_as_untranslated_residual() {
let fields = test_fields();
let filters = vec![Expr::Not(Box::new(
Expr::Column(Column::from_name("dt")).eq(lit("2024-01-01")),
))];
let analysis = analyze_filters(&filters, &fields);

assert!(analysis.pushed_predicate.is_none());
assert!(analysis.has_untranslated_residual);
}

#[test]
fn test_translate_reversed_partition_comparison() {
let fields = test_fields();
Expand Down Expand Up @@ -448,7 +539,7 @@ mod tests {
let filter = Expr::Column(Column::from_name("id")).gt(lit(10));

assert_eq!(
classify_filter_pushdown(&filter, &fields, &partition_keys()),
classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown),
TableProviderFilterPushDown::Inexact
);
}
Expand All @@ -474,7 +565,7 @@ mod tests {
.and(Expr::Column(Column::from_name("id")).gt(lit(10)));

assert_eq!(
classify_filter_pushdown(&filter, &fields, &partition_keys()),
classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown),
TableProviderFilterPushDown::Inexact
);
}
Expand All @@ -500,7 +591,7 @@ mod tests {
));

assert_eq!(
classify_filter_pushdown(&filter, &fields, &partition_keys()),
classify_filter_pushdown(&filter, &fields, is_exact_filter_pushdown),
TableProviderFilterPushDown::Unsupported
);
}
Expand Down
1 change: 0 additions & 1 deletion crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ mod table;

pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
pub use error::to_datafusion_error;
pub use physical_plan::PaimonTableScan;
pub use relation_planner::PaimonRelationPlanner;
pub use table::PaimonTableProvider;
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

pub(crate) mod scan;

pub use scan::PaimonTableScan;
pub(crate) use scan::PaimonTableScan;
11 changes: 4 additions & 7 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::error::to_datafusion_error;
/// and the resulting splits are distributed across DataFusion execution partitions
/// so that DataFusion can schedule them in parallel.
#[derive(Debug)]
pub struct PaimonTableScan {
pub(crate) struct PaimonTableScan {
table: Table,
/// Projected column names (if None, reads all columns).
projected_columns: Option<Vec<String>>,
Expand All @@ -52,7 +52,7 @@ pub struct PaimonTableScan {
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
planned_partitions: Vec<Arc<[DataSplit]>>,
plan_properties: PlanProperties,
/// Optional limit on the number of rows to return.
/// Optional limit hint pushed to paimon-core planning.
limit: Option<usize>,
}

Expand Down Expand Up @@ -81,10 +81,6 @@ impl PaimonTableScan {
}
}

pub fn table(&self) -> &Table {
&self.table
}

#[cfg(test)]
pub(crate) fn planned_partitions(&self) -> &[Arc<[DataSplit]>] {
&self.planned_partitions
Expand All @@ -95,7 +91,8 @@ impl PaimonTableScan {
self.pushed_predicate.as_ref()
}

pub fn limit(&self) -> Option<usize> {
#[cfg(test)]
pub(crate) fn limit(&self) -> Option<usize> {
self.limit
}
}
Expand Down
Loading
Loading