Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ license = "Apache-2.0"
rust-version = "1.86.0"

[workspace.dependencies]
arrow = "57.0"
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-buffer = "57.0"
arrow-schema = "57.0"
arrow-cast = "57.0"
arrow-ord = "57.0"
datafusion = "52.3.0"
datafusion-ffi = "52.3.0"
parquet = "57.0"
arrow = "58.0"
arrow-array = { version = "58.0", features = ["ffi"] }
arrow-buffer = "58.0"
arrow-schema = "58.0"
arrow-cast = "58.0"
arrow-ord = "58.0"
datafusion = "53.0.0"
datafusion-ffi = "53.0.0"
parquet = "58.0"
tokio = "1.39.2"
2 changes: 1 addition & 1 deletion bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ datafusion = { workspace = true }
datafusion-ffi = { workspace = true }
paimon = { path = "../../crates/paimon", features = ["storage-all"] }
paimon-datafusion = { path = "../../crates/integrations/datafusion" }
pyo3 = { version = "0.26", features = ["abi3-py310"] }
pyo3 = { version = "0.28", features = ["abi3-py310"] }
tokio = { workspace = true }
21 changes: 2 additions & 19 deletions bindings/python/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
// under the License.

use std::collections::HashMap;
use std::ptr::NonNull;
use std::sync::Arc;

use datafusion::catalog::CatalogProvider;
use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use paimon::{CatalogFactory, Options};
use paimon_datafusion::PaimonCatalogProvider;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;

Expand Down Expand Up @@ -52,23 +50,8 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) -> PyResult<FFI_Logic

let capsule = capsule.cast::<PyCapsule>()?;
let expected_name = c"datafusion_logical_extension_codec";
match capsule.name()? {
Some(name) if name == expected_name => {}
Some(name) => {
return Err(PyValueError::new_err(format!(
"Expected capsule named {expected_name:?}, got {name:?}"
)));
}
None => {
return Err(PyValueError::new_err(format!(
"Expected capsule named {expected_name:?}, got unnamed capsule"
)));
}
}

let data = NonNull::new(capsule.pointer().cast::<FFI_LogicalExtensionCodec>())
.ok_or_else(|| PyValueError::new_err("Null logical extension codec capsule pointer"))?;
let codec = unsafe { data.as_ref() };
let ptr = capsule.pointer_checked(Some(expected_name))?;
let codec = unsafe { ptr.cast::<FFI_LogicalExtensionCodec>().as_ref() };

Ok(codec.clone())
}
Expand Down
2 changes: 2 additions & 0 deletions bindings/python/tests/test_datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os

import pyarrow as pa
import pytest
from datafusion import SessionContext

from pypaimon_rust.datafusion import PaimonCatalog
Expand All @@ -30,6 +31,7 @@ def extract_rows(batches):
return sorted(zip(table["id"].to_pylist(), table["name"].to_pylist()))


@pytest.mark.skip(reason="Requires Python datafusion 53 (not yet released) to match Rust datafusion-ffi 53")
def test_query_simple_table_via_catalog_provider():
catalog = PaimonCatalog({"warehouse": WAREHOUSE})
ctx = SessionContext()
Expand Down
32 changes: 16 additions & 16 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1699,7 +1699,7 @@ async fn test_time_travel_by_snapshot_id() {

// Snapshot 1: (1, 'alice'), (2, 'bob')
let table_snap1 = table.copy_with_options(HashMap::from([(
"scan.snapshot-id".to_string(),
"scan.version".to_string(),
"1".to_string(),
)]));
let rb = table_snap1.new_read_builder();
Expand All @@ -1720,7 +1720,7 @@ async fn test_time_travel_by_snapshot_id() {

// Snapshot 2: (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
let table_snap2 = table.copy_with_options(HashMap::from([(
"scan.snapshot-id".to_string(),
"scan.version".to_string(),
"2".to_string(),
)]));
let rb2 = table_snap2.new_read_builder();
Expand Down Expand Up @@ -1753,7 +1753,7 @@ async fn test_time_travel_by_tag_name() {

// Tag 'snapshot1' -> snapshot 1: (1, 'alice'), (2, 'bob')
let table_tag1 = table.copy_with_options(HashMap::from([(
"scan.tag-name".to_string(),
"scan.version".to_string(),
"snapshot1".to_string(),
)]));
let rb = table_tag1.new_read_builder();
Expand All @@ -1774,7 +1774,7 @@ async fn test_time_travel_by_tag_name() {

// Tag 'snapshot2' -> snapshot 2: all 4 rows
let table_tag2 = table.copy_with_options(HashMap::from([(
"scan.tag-name".to_string(),
"scan.version".to_string(),
"snapshot2".to_string(),
)]));
let rb2 = table_tag2.new_read_builder();
Expand Down Expand Up @@ -1805,8 +1805,8 @@ async fn test_time_travel_conflicting_selectors_fail() {
let table = get_table_from_catalog(&catalog, "time_travel_table").await;

let conflicted = table.copy_with_options(HashMap::from([
("scan.tag-name".to_string(), "snapshot1".to_string()),
("scan.snapshot-id".to_string(), "2".to_string()),
("scan.version".to_string(), "snapshot1".to_string()),
("scan.timestamp-millis".to_string(), "1234".to_string()),
]));

let plan_err = conflicted
Expand All @@ -1823,40 +1823,40 @@ async fn test_time_travel_conflicting_selectors_fail() {
"unexpected conflict error: {message}"
);
assert!(
message.contains("scan.snapshot-id"),
"conflict error should mention scan.snapshot-id: {message}"
message.contains("scan.version"),
"conflict error should mention scan.version: {message}"
);
assert!(
message.contains("scan.tag-name"),
"conflict error should mention scan.tag-name: {message}"
message.contains("scan.timestamp-millis"),
"conflict error should mention scan.timestamp-millis: {message}"
);
}
other => panic!("unexpected error: {other:?}"),
}
}

#[tokio::test]
async fn test_time_travel_invalid_numeric_selector_fails() {
async fn test_time_travel_invalid_version_fails() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "time_travel_table").await;

let invalid = table.copy_with_options(HashMap::from([(
"scan.snapshot-id".to_string(),
"not-a-number".to_string(),
"scan.version".to_string(),
"nonexistent-tag".to_string(),
)]));

let plan_err = invalid
.new_read_builder()
.new_scan()
.plan()
.await
.expect_err("invalid numeric time-travel selector should fail");
.expect_err("invalid version should fail");

match plan_err {
Error::DataInvalid { message, .. } => {
assert!(
message.contains("Invalid value for scan.snapshot-id"),
"unexpected invalid selector error: {message}"
message.contains("is not a valid tag name or snapshot id"),
"unexpected invalid version error: {message}"
);
}
other => panic!("unexpected error: {other:?}"),
Expand Down
12 changes: 4 additions & 8 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub struct PaimonTableScan {
/// Paimon splits that DataFusion partition `i` will read.
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`.
planned_partitions: Vec<Arc<[DataSplit]>>,
plan_properties: PlanProperties,
plan_properties: Arc<PlanProperties>,
/// Optional limit on the number of rows to return.
limit: Option<usize>,
}
Expand All @@ -65,12 +65,12 @@ impl PaimonTableScan {
planned_partitions: Vec<Arc<[DataSplit]>>,
limit: Option<usize>,
) -> Self {
let plan_properties = PlanProperties::new(
let plan_properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Partitioning::UnknownPartitioning(planned_partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
);
));
Self {
table,
projected_columns,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl ExecutionPlan for PaimonTableScan {
self
}

fn properties(&self) -> &PlanProperties {
fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}

Expand Down Expand Up @@ -168,10 +168,6 @@ impl ExecutionPlan for PaimonTableScan {
)))
}

fn statistics(&self) -> DFResult<Statistics> {
self.partition_statistics(None)
}

fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
let partitions: &[Arc<[DataSplit]>] = match partition {
Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]),
Expand Down
90 changes: 47 additions & 43 deletions crates/integrations/datafusion/src/relation_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS OF`.
//! Custom [`RelationPlanner`] for Paimon time travel via `VERSION AS OF` and `TIMESTAMP AS OF`.

use std::collections::HashMap;
use std::fmt::Debug;
Expand All @@ -29,16 +29,16 @@ use datafusion::logical_expr::planner::{
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
};
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};

use crate::table::PaimonTableProvider;

/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
/// on Paimon tables and resolves them to time travel options.
/// A [`RelationPlanner`] that intercepts `VERSION AS OF` and `TIMESTAMP AS OF`
/// clauses on Paimon tables and resolves them to time travel options.
///
/// - Integer literal → sets `scan.snapshot-id` option on the table.
/// - String literal (timestamp) → parsed as a timestamp, sets `scan.timestamp-millis` option.
/// - String literal (other) → sets `scan.tag-name` option on the table.
/// - `VERSION AS OF <integer or string>` → sets `scan.version` option on the table.
/// At scan time, the version is resolved: tag name (if exists) → snapshot id → error.
/// - `TIMESTAMP AS OF <timestamp string>` → parsed as a timestamp, sets `scan.timestamp-millis`.
#[derive(Debug)]
pub struct PaimonRelationPlanner;

Expand Down Expand Up @@ -67,12 +67,13 @@ impl RelationPlanner for PaimonRelationPlanner {
..
} = relation
else {
return Ok(RelationPlanning::Original(relation));
return Ok(RelationPlanning::Original(Box::new(relation)));
};

let version_expr = match version {
Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
_ => return Ok(RelationPlanning::Original(relation)),
let extra_options = match version {
Some(TableVersion::VersionAsOf(expr)) => resolve_version_as_of(expr)?,
Some(TableVersion::TimestampAsOf(expr)) => resolve_timestamp_as_of(expr)?,
_ => return Ok(RelationPlanning::Original(Box::new(relation))),
};

// Resolve the table reference.
Expand All @@ -84,10 +85,9 @@ impl RelationPlanner for PaimonRelationPlanner {

// Check if this is a Paimon table.
let Some(paimon_provider) = provider.as_any().downcast_ref::<PaimonTableProvider>() else {
return Ok(RelationPlanning::Original(relation));
return Ok(RelationPlanning::Original(Box::new(relation)));
};

let extra_options = resolve_time_travel_options(&version_expr)?;
let new_table = paimon_provider.table().copy_with_options(extra_options);
let new_provider = PaimonTableProvider::try_new(new_table)?;
let new_source = provider_as_source(Arc::new(new_provider));
Expand All @@ -98,7 +98,9 @@ impl RelationPlanner for PaimonRelationPlanner {
};

let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?;
Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
plan, alias,
))))
}
}

Expand Down Expand Up @@ -136,45 +138,47 @@ fn object_name_to_table_reference(
}
}

/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
/// Resolve `VERSION AS OF <expr>` into `scan.version` option.
///
/// - Integer literal → `{"scan.snapshot-id": "N"}`
/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → `{"scan.timestamp-millis": "M"}`
/// - String literal (other) → `{"scan.tag-name": "S"}`
fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
/// The raw value (integer or string) is passed through as-is.
/// Resolution (tag vs snapshot id) happens at scan time in `TableScan`.
fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
let version = match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::Number(n, _) => n.clone(),
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => s.clone(),
_ => {
return Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported VERSION AS OF expression: {expr}"
)))
}
},
_ => {
return Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported VERSION AS OF expression: {expr}. Expected an integer snapshot id or a tag name."
)))
}
};
Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
}

/// Resolve `TIMESTAMP AS OF <expr>` into `scan.timestamp-millis` option.
fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::Number(n, _) => {
// Validate it's a valid integer
n.parse::<i64>().map_err(|e| {
datafusion::error::DataFusionError::Plan(format!(
"Invalid snapshot id '{n}': {e}"
))
})?;
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
let millis = parse_timestamp_to_millis(s)?;
Ok(HashMap::from([(
SCAN_SNAPSHOT_ID_OPTION.to_string(),
n.clone(),
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
millis.to_string(),
)]))
}
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
// Try parsing as timestamp first; fall back to tag name.
match parse_timestamp_to_millis(s) {
Ok(timestamp_millis) => Ok(HashMap::from([(
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
timestamp_millis.to_string(),
)])),
Err(_) => Ok(HashMap::from([(
SCAN_TAG_NAME_OPTION.to_string(),
s.clone(),
)])),
}
}
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported time travel expression: {expr}"
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
))),
},
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported time travel expression: {expr}. Expected an integer snapshot id, a timestamp string, or a tag name."
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
))),
}
}
Expand Down
Loading
Loading