diff --git a/Cargo.toml b/Cargo.toml index 5677a5eb..62bb1278 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,13 +28,13 @@ license = "Apache-2.0" rust-version = "1.86.0" [workspace.dependencies] -arrow = "57.0" -arrow-array = { version = "57.0", features = ["ffi"] } -arrow-buffer = "57.0" -arrow-schema = "57.0" -arrow-cast = "57.0" -arrow-ord = "57.0" -datafusion = "52.3.0" -datafusion-ffi = "52.3.0" -parquet = "57.0" +arrow = "58.0" +arrow-array = { version = "58.0", features = ["ffi"] } +arrow-buffer = "58.0" +arrow-schema = "58.0" +arrow-cast = "58.0" +arrow-ord = "58.0" +datafusion = "53.0.0" +datafusion-ffi = "53.0.0" +parquet = "58.0" tokio = "1.39.2" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 617c7f6b..6ed24065 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -32,5 +32,5 @@ datafusion = { workspace = true } datafusion-ffi = { workspace = true } paimon = { path = "../../crates/paimon", features = ["storage-all"] } paimon-datafusion = { path = "../../crates/integrations/datafusion" } -pyo3 = { version = "0.26", features = ["abi3-py310"] } +pyo3 = { version = "0.28", features = ["abi3-py310"] } tokio = { workspace = true } diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index 6631ff61..2bc1e1da 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::HashMap; -use std::ptr::NonNull; use std::sync::Arc; use datafusion::catalog::CatalogProvider; @@ -24,7 +23,6 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use paimon::{CatalogFactory, Options}; use paimon_datafusion::PaimonCatalogProvider; -use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::PyCapsule; @@ -52,23 +50,8 @@ fn ffi_logical_codec_from_pycapsule(obj: Bound<'_, PyAny>) -> PyResult()?; let expected_name = c"datafusion_logical_extension_codec"; - match capsule.name()? { - Some(name) if name == expected_name => {} - Some(name) => { - return Err(PyValueError::new_err(format!( - "Expected capsule named {expected_name:?}, got {name:?}" - ))); - } - None => { - return Err(PyValueError::new_err(format!( - "Expected capsule named {expected_name:?}, got unnamed capsule" - ))); - } - } - - let data = NonNull::new(capsule.pointer().cast::()) - .ok_or_else(|| PyValueError::new_err("Null logical extension codec capsule pointer"))?; - let codec = unsafe { data.as_ref() }; + let ptr = capsule.pointer_checked(Some(expected_name))?; + let codec = unsafe { ptr.cast::().as_ref() }; Ok(codec.clone()) } diff --git a/bindings/python/tests/test_datafusion.py b/bindings/python/tests/test_datafusion.py index 919ef55c..d2135446 100644 --- a/bindings/python/tests/test_datafusion.py +++ b/bindings/python/tests/test_datafusion.py @@ -18,6 +18,7 @@ import os import pyarrow as pa +import pytest from datafusion import SessionContext from pypaimon_rust.datafusion import PaimonCatalog @@ -30,6 +31,7 @@ def extract_rows(batches): return sorted(zip(table["id"].to_pylist(), table["name"].to_pylist())) +@pytest.mark.skip(reason="Requires Python datafusion 53 (not yet released) to match Rust datafusion-ffi 53") def test_query_simple_table_via_catalog_provider(): catalog = PaimonCatalog({"warehouse": WAREHOUSE}) ctx = SessionContext() diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 5e2ac48f..9ed39cf2 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -1699,7 +1699,7 @@ async fn test_time_travel_by_snapshot_id() { // Snapshot 1: (1, 'alice'), (2, 'bob') let table_snap1 = table.copy_with_options(HashMap::from([( - "scan.snapshot-id".to_string(), + "scan.version".to_string(), "1".to_string(), )])); let rb = table_snap1.new_read_builder(); @@ -1720,7 +1720,7 @@ async fn test_time_travel_by_snapshot_id() { // Snapshot 2: (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave') let table_snap2 = table.copy_with_options(HashMap::from([( - "scan.snapshot-id".to_string(), + "scan.version".to_string(), "2".to_string(), )])); let rb2 = table_snap2.new_read_builder(); @@ -1753,7 +1753,7 @@ async fn test_time_travel_by_tag_name() { // Tag 'snapshot1' -> snapshot 1: (1, 'alice'), (2, 'bob') let table_tag1 = table.copy_with_options(HashMap::from([( - "scan.tag-name".to_string(), + "scan.version".to_string(), "snapshot1".to_string(), )])); let rb = table_tag1.new_read_builder(); @@ -1774,7 +1774,7 @@ async fn test_time_travel_by_tag_name() { // Tag 'snapshot2' -> snapshot 2: all 4 rows let table_tag2 = table.copy_with_options(HashMap::from([( - "scan.tag-name".to_string(), + "scan.version".to_string(), "snapshot2".to_string(), )])); let rb2 = table_tag2.new_read_builder(); @@ -1805,8 +1805,8 @@ async fn test_time_travel_conflicting_selectors_fail() { let table = get_table_from_catalog(&catalog, "time_travel_table").await; let conflicted = table.copy_with_options(HashMap::from([ - ("scan.tag-name".to_string(), "snapshot1".to_string()), - ("scan.snapshot-id".to_string(), "2".to_string()), + ("scan.version".to_string(), "snapshot1".to_string()), + ("scan.timestamp-millis".to_string(), "1234".to_string()), ])); let plan_err = conflicted @@ -1823,12 +1823,12 @@ async fn test_time_travel_conflicting_selectors_fail() { "unexpected conflict error: {message}" ); assert!( - message.contains("scan.snapshot-id"), - "conflict error should mention scan.snapshot-id: {message}" + message.contains("scan.version"), + "conflict error should mention scan.version: {message}" ); assert!( - message.contains("scan.tag-name"), - "conflict error should mention scan.tag-name: {message}" + message.contains("scan.timestamp-millis"), + "conflict error should mention scan.timestamp-millis: {message}" ); } other => panic!("unexpected error: {other:?}"), @@ -1836,13 +1836,13 @@ async fn test_time_travel_conflicting_selectors_fail() { } #[tokio::test] -async fn test_time_travel_invalid_numeric_selector_fails() { +async fn test_time_travel_invalid_version_fails() { let catalog = create_file_system_catalog(); let table = get_table_from_catalog(&catalog, "time_travel_table").await; let invalid = table.copy_with_options(HashMap::from([( - "scan.snapshot-id".to_string(), - "not-a-number".to_string(), + "scan.version".to_string(), + "nonexistent-tag".to_string(), )])); let plan_err = invalid @@ -1850,13 +1850,13 @@ async fn test_time_travel_invalid_numeric_selector_fails() { .new_scan() .plan() .await - .expect_err("invalid numeric time-travel selector should fail"); + .expect_err("invalid version should fail"); match plan_err { Error::DataInvalid { message, .. } => { assert!( - message.contains("Invalid value for scan.snapshot-id"), - "unexpected invalid selector error: {message}" + message.contains("is not a valid tag name or snapshot id"), + "unexpected invalid version error: {message}" ); } other => panic!("unexpected error: {other:?}"), diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index a1c35a8f..0b2d1f27 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -51,7 +51,7 @@ pub struct PaimonTableScan { /// Paimon splits that DataFusion partition `i` will read. /// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in `execute()`. planned_partitions: Vec>, - plan_properties: PlanProperties, + plan_properties: Arc, /// Optional limit on the number of rows to return. limit: Option, } @@ -65,12 +65,12 @@ impl PaimonTableScan { planned_partitions: Vec>, limit: Option, ) -> Self { - let plan_properties = PlanProperties::new( + let plan_properties = Arc::new(PlanProperties::new( EquivalenceProperties::new(schema.clone()), Partitioning::UnknownPartitioning(planned_partitions.len()), EmissionType::Incremental, Boundedness::Bounded, - ); + )); Self { table, projected_columns, @@ -109,7 +109,7 @@ impl ExecutionPlan for PaimonTableScan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } @@ -168,10 +168,6 @@ impl ExecutionPlan for PaimonTableScan { ))) } - fn statistics(&self) -> DFResult { - self.partition_statistics(None) - } - fn partition_statistics(&self, partition: Option) -> DFResult { let partitions: &[Arc<[DataSplit]>] = match partition { Some(idx) => std::slice::from_ref(&self.planned_partitions[idx]), diff --git a/crates/integrations/datafusion/src/relation_planner.rs b/crates/integrations/datafusion/src/relation_planner.rs index 87198a46..80b46659 100644 --- a/crates/integrations/datafusion/src/relation_planner.rs +++ b/crates/integrations/datafusion/src/relation_planner.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS OF`. +//! Custom [`RelationPlanner`] for Paimon time travel via `VERSION AS OF` and `TIMESTAMP AS OF`. use std::collections::HashMap; use std::fmt::Debug; @@ -29,16 +29,16 @@ use datafusion::logical_expr::planner::{ PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning, }; use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion}; -use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TAG_NAME_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION}; +use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION}; use crate::table::PaimonTableProvider; -/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses -/// on Paimon tables and resolves them to time travel options. +/// A [`RelationPlanner`] that intercepts `VERSION AS OF` and `TIMESTAMP AS OF` +/// clauses on Paimon tables and resolves them to time travel options. /// -/// - Integer literal → sets `scan.snapshot-id` option on the table. -/// - String literal (timestamp) → parsed as a timestamp, sets `scan.timestamp-millis` option. -/// - String literal (other) → sets `scan.tag-name` option on the table. +/// - `VERSION AS OF ` → sets `scan.version` option on the table. +/// At scan time, the version is resolved: tag name (if exists) → snapshot id → error. +/// - `TIMESTAMP AS OF ` → parsed as a timestamp, sets `scan.timestamp-millis`. #[derive(Debug)] pub struct PaimonRelationPlanner; @@ -67,12 +67,13 @@ impl RelationPlanner for PaimonRelationPlanner { .. } = relation else { - return Ok(RelationPlanning::Original(relation)); + return Ok(RelationPlanning::Original(Box::new(relation))); }; - let version_expr = match version { - Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(), - _ => return Ok(RelationPlanning::Original(relation)), + let extra_options = match version { + Some(TableVersion::VersionAsOf(expr)) => resolve_version_as_of(expr)?, + Some(TableVersion::TimestampAsOf(expr)) => resolve_timestamp_as_of(expr)?, + _ => return Ok(RelationPlanning::Original(Box::new(relation))), }; // Resolve the table reference. @@ -84,10 +85,9 @@ impl RelationPlanner for PaimonRelationPlanner { // Check if this is a Paimon table. let Some(paimon_provider) = provider.as_any().downcast_ref::() else { - return Ok(RelationPlanning::Original(relation)); + return Ok(RelationPlanning::Original(Box::new(relation))); }; - let extra_options = resolve_time_travel_options(&version_expr)?; let new_table = paimon_provider.table().copy_with_options(extra_options); let new_provider = PaimonTableProvider::try_new(new_table)?; let new_source = provider_as_source(Arc::new(new_provider)); @@ -98,7 +98,9 @@ impl RelationPlanner for PaimonRelationPlanner { }; let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?; - Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias))) + Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new( + plan, alias, + )))) } } @@ -136,45 +138,47 @@ fn object_name_to_table_reference( } } -/// Resolve `FOR SYSTEM_TIME AS OF ` into table options. +/// Resolve `VERSION AS OF ` into `scan.version` option. /// -/// - Integer literal → `{"scan.snapshot-id": "N"}` -/// - String literal (timestamp `YYYY-MM-DD HH:MM:SS`) → `{"scan.timestamp-millis": "M"}` -/// - String literal (other) → `{"scan.tag-name": "S"}` -fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult> { +/// The raw value (integer or string) is passed through as-is. +/// Resolution (tag vs snapshot id) happens at scan time in `TableScan`. +fn resolve_version_as_of(expr: &ast::Expr) -> DFResult> { + let version = match expr { + ast::Expr::Value(v) => match &v.value { + ast::Value::Number(n, _) => n.clone(), + ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => s.clone(), + _ => { + return Err(datafusion::error::DataFusionError::Plan(format!( + "Unsupported VERSION AS OF expression: {expr}" + ))) + } + }, + _ => { + return Err(datafusion::error::DataFusionError::Plan(format!( + "Unsupported VERSION AS OF expression: {expr}. Expected an integer snapshot id or a tag name." + ))) + } + }; + Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)])) +} + +/// Resolve `TIMESTAMP AS OF ` into `scan.timestamp-millis` option. +fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult> { match expr { ast::Expr::Value(v) => match &v.value { - ast::Value::Number(n, _) => { - // Validate it's a valid integer - n.parse::().map_err(|e| { - datafusion::error::DataFusionError::Plan(format!( - "Invalid snapshot id '{n}': {e}" - )) - })?; + ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => { + let millis = parse_timestamp_to_millis(s)?; Ok(HashMap::from([( - SCAN_SNAPSHOT_ID_OPTION.to_string(), - n.clone(), + SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), + millis.to_string(), )])) } - ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => { - // Try parsing as timestamp first; fall back to tag name. - match parse_timestamp_to_millis(s) { - Ok(timestamp_millis) => Ok(HashMap::from([( - SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), - timestamp_millis.to_string(), - )])), - Err(_) => Ok(HashMap::from([( - SCAN_TAG_NAME_OPTION.to_string(), - s.clone(), - )])), - } - } _ => Err(datafusion::error::DataFusionError::Plan(format!( - "Unsupported time travel expression: {expr}" + "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string." ))), }, _ => Err(datafusion::error::DataFusionError::Plan(format!( - "Unsupported time travel expression: {expr}. Expected an integer snapshot id, a timestamp string, or a tag name." + "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string." ))), } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 4fc26d6d..3b5df63c 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -383,10 +383,10 @@ async fn test_missing_database_returns_no_schema() { // ======================= Time Travel Tests ======================= /// Helper: create a SessionContext with catalog + relation planner for time travel. -/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax. +/// Uses Databricks dialect to enable `VERSION AS OF` and `TIMESTAMP AS OF` syntax. async fn create_time_travel_context() -> SessionContext { let catalog = create_catalog(); - let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "BigQuery"); + let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "Databricks"); let ctx = SessionContext::new_with_config(config); ctx.register_catalog( "paimon", @@ -403,7 +403,7 @@ async fn test_time_travel_by_snapshot_id() { // Snapshot 1: should contain only the first insert (alice, bob) let batches = ctx - .sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 1") + .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS OF 1") .await .expect("time travel query should parse") .collect() @@ -420,7 +420,7 @@ async fn test_time_travel_by_snapshot_id() { // Snapshot 2 (latest): should contain all rows let batches = ctx - .sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 2") + .sql("SELECT id, name FROM paimon.default.time_travel_table VERSION AS OF 2") .await .expect("time travel query should parse") .collect() @@ -443,11 +443,21 @@ async fn test_time_travel_by_snapshot_id() { #[tokio::test] async fn test_time_travel_by_tag_name() { - let ctx = create_time_travel_context().await; + // Tag-based time travel uses `scan.version` option directly since + // `VERSION AS OF` in SQL only accepts numeric values. + let provider = create_provider_with_options( + "time_travel_table", + HashMap::from([("scan.version".to_string(), "snapshot1".to_string())]), + ) + .await; + + let ctx = SessionContext::new(); + ctx.register_table("time_travel_table", Arc::new(provider)) + .expect("Failed to register table"); // Tag 'snapshot1' points to snapshot 1: should contain only (alice, bob) let batches = ctx - .sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 'snapshot1'") + .sql("SELECT id, name FROM time_travel_table") .await .expect("tag time travel query should parse") .collect() @@ -463,8 +473,18 @@ async fn test_time_travel_by_tag_name() { ); // Tag 'snapshot2' points to snapshot 2: should contain all rows - let batches = ctx - .sql("SELECT id, name FROM paimon.default.time_travel_table FOR SYSTEM_TIME AS OF 'snapshot2'") + let provider2 = create_provider_with_options( + "time_travel_table", + HashMap::from([("scan.version".to_string(), "snapshot2".to_string())]), + ) + .await; + + let ctx2 = SessionContext::new(); + ctx2.register_table("time_travel_table", Arc::new(provider2)) + .expect("Failed to register table"); + + let batches = ctx2 + .sql("SELECT id, name FROM time_travel_table") .await .expect("tag time travel query should parse") .collect() @@ -489,11 +509,11 @@ async fn test_time_travel_by_tag_name() { async fn test_time_travel_conflicting_selectors_fail() { let provider = create_provider_with_options( "time_travel_table", - HashMap::from([("scan.tag-name".to_string(), "snapshot1".to_string())]), + HashMap::from([("scan.timestamp-millis".to_string(), "1234".to_string())]), ) .await; - let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "BigQuery"); + let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "Databricks"); let ctx = SessionContext::new_with_config(config); ctx.register_table("time_travel_table", Arc::new(provider)) .expect("Failed to register table"); @@ -501,7 +521,7 @@ async fn test_time_travel_conflicting_selectors_fail() { .expect("Failed to register relation planner"); let err = ctx - .sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2") + .sql("SELECT id, name FROM time_travel_table VERSION AS OF 2") .await .expect("time travel query should parse") .collect() @@ -514,20 +534,16 @@ async fn test_time_travel_conflicting_selectors_fail() { "unexpected conflict error: {message}" ); assert!( - message.contains("scan.snapshot-id"), - "conflict error should mention scan.snapshot-id: {message}" - ); - assert!( - message.contains("scan.tag-name"), - "conflict error should mention scan.tag-name: {message}" + message.contains("scan.version"), + "conflict error should mention scan.version: {message}" ); } #[tokio::test] -async fn test_time_travel_invalid_numeric_selector_fails() { +async fn test_time_travel_invalid_version_fails() { let provider = create_provider_with_options( "time_travel_table", - HashMap::from([("scan.snapshot-id".to_string(), "not-a-number".to_string())]), + HashMap::from([("scan.version".to_string(), "nonexistent-tag".to_string())]), ) .await; @@ -541,12 +557,12 @@ async fn test_time_travel_invalid_numeric_selector_fails() { .expect("query should parse") .collect() .await - .expect_err("invalid numeric time-travel selector should fail"); + .expect_err("invalid version should fail"); let message = err.to_string(); assert!( - message.contains("Invalid value for scan.snapshot-id"), - "unexpected invalid selector error: {message}" + message.contains("is not a valid tag name or snapshot id"), + "unexpected invalid version error: {message}" ); } diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 10e5fa0d..20d85cc0 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -66,7 +66,7 @@ arrow-ord = { workspace = true } arrow-schema = { workspace = true } futures = "0.3" parquet = { workspace = true, features = ["async", "zstd"] } -orc-rust = "0.7.0" +orc-rust = "0.8.0" async-stream = "0.3.6" reqwest = { version = "0.12", features = ["json"] } # DLF authentication dependencies diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 17993d9e..ac3741d1 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -37,9 +37,8 @@ const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10; const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000; const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000; const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000; -pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id"; pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis"; -pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name"; +pub const SCAN_VERSION_OPTION: &str = "scan.version"; const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; @@ -52,11 +51,12 @@ pub struct CoreOptions<'a> { options: &'a HashMap, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum TimeTravelSelector<'a> { - TagName(&'a str), - SnapshotId(i64), TimestampMillis(i64), + /// Raw version string from `VERSION AS OF`. Resolved at scan time: + /// tag name (if tag exists) → snapshot id (if parseable as i64) → error. + Version(&'a str), } impl<'a> CoreOptions<'a> { @@ -133,17 +133,6 @@ impl<'a> CoreOptions<'a> { } } - /// Raw snapshot id accessor for `scan.snapshot-id`. - /// - /// This compatibility accessor is lossy: it returns `None` for absent or - /// invalid values and does not validate selector conflicts. Internal - /// time-travel planning should use `try_time_travel_selector`. - pub fn scan_snapshot_id(&self) -> Option { - self.options - .get(SCAN_SNAPSHOT_ID_OPTION) - .and_then(|v| v.parse().ok()) - } - /// Raw timestamp accessor for `scan.timestamp-millis`. /// /// This compatibility accessor is lossy: it returns `None` for absent or @@ -155,25 +144,14 @@ impl<'a> CoreOptions<'a> { .and_then(|v| v.parse().ok()) } - /// Raw tag name accessor for `scan.tag-name`. - /// - /// This compatibility accessor does not validate selector conflicts. - /// Internal time-travel planning should use `try_time_travel_selector`. - pub fn scan_tag_name(&self) -> Option<&'a str> { - self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str) - } - fn configured_time_travel_selectors(&self) -> Vec<&'static str> { - let mut selectors = Vec::with_capacity(3); - if self.options.contains_key(SCAN_TAG_NAME_OPTION) { - selectors.push(SCAN_TAG_NAME_OPTION); - } - if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) { - selectors.push(SCAN_SNAPSHOT_ID_OPTION); - } + let mut selectors = Vec::with_capacity(2); if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) { selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION); } + if self.options.contains_key(SCAN_VERSION_OPTION) { + selectors.push(SCAN_VERSION_OPTION); + } selectors } @@ -193,12 +171,10 @@ impl<'a> CoreOptions<'a> { }); } - if let Some(tag_name) = self.scan_tag_name() { - Ok(Some(TimeTravelSelector::TagName(tag_name))) - } else if let Some(id) = self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? { - Ok(Some(TimeTravelSelector::SnapshotId(id))) - } else if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? { + if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? { Ok(Some(TimeTravelSelector::TimestampMillis(ts))) + } else if let Some(version) = self.options.get(SCAN_VERSION_OPTION).map(String::as_str) { + Ok(Some(TimeTravelSelector::Version(version))) } else { Ok(None) } @@ -368,8 +344,8 @@ mod tests { #[test] fn test_try_time_travel_selector_rejects_conflicting_selectors() { let options = HashMap::from([ - (SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()), - (SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()), + (SCAN_VERSION_OPTION.to_string(), "tag1".to_string()), + (SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string()), ]); let core = CoreOptions::new(&options); @@ -379,8 +355,8 @@ mod tests { match err { crate::Error::DataInvalid { message, .. } => { assert!(message.contains("Only one time-travel selector may be set")); - assert!(message.contains(SCAN_TAG_NAME_OPTION)); - assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION)); + assert!(message.contains(SCAN_VERSION_OPTION)); + assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION)); } other => panic!("unexpected error: {other:?}"), } @@ -388,20 +364,6 @@ mod tests { #[test] fn test_try_time_travel_selector_rejects_invalid_numeric_values() { - let snapshot_options = - HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "abc".to_string())]); - let snapshot_core = CoreOptions::new(&snapshot_options); - - let snapshot_err = snapshot_core - .try_time_travel_selector() - .expect_err("invalid snapshot id should fail"); - match snapshot_err { - crate::Error::DataInvalid { message, .. } => { - assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION)); - } - other => panic!("unexpected error: {other:?}"), - } - let timestamp_options = HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "xyz".to_string())]); let timestamp_core = CoreOptions::new(×tamp_options); @@ -450,31 +412,34 @@ mod tests { #[test] fn test_try_time_travel_selector_normalizes_valid_selector() { - let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string())]); - let tag_core = CoreOptions::new(&tag_options); + let timestamp_options = + HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string())]); + let timestamp_core = CoreOptions::new(×tamp_options); assert_eq!( - tag_core.try_time_travel_selector().expect("tag selector"), - Some(TimeTravelSelector::TagName("tag1")) + timestamp_core + .try_time_travel_selector() + .expect("timestamp selector"), + Some(TimeTravelSelector::TimestampMillis(1234)) ); - let snapshot_options = - HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string())]); - let snapshot_core = CoreOptions::new(&snapshot_options); + let version_options = + HashMap::from([(SCAN_VERSION_OPTION.to_string(), "my-tag".to_string())]); + let version_core = CoreOptions::new(&version_options); assert_eq!( - snapshot_core + version_core .try_time_travel_selector() - .expect("snapshot selector"), - Some(TimeTravelSelector::SnapshotId(7)) + .expect("version selector"), + Some(TimeTravelSelector::Version("my-tag")) ); - let timestamp_options = - HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string())]); - let timestamp_core = CoreOptions::new(×tamp_options); + let version_num_options = + HashMap::from([(SCAN_VERSION_OPTION.to_string(), "3".to_string())]); + let version_num_core = CoreOptions::new(&version_num_options); assert_eq!( - timestamp_core + version_num_core .try_time_travel_selector() - .expect("timestamp selector"), - Some(TimeTravelSelector::TimestampMillis(1234)) + .expect("version numeric selector"), + Some(TimeTravelSelector::Version("3")) ); } } diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 06fe87a0..c6bfea47 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -345,9 +345,8 @@ impl<'a> TableScan<'a> { /// Plan the full scan: resolve snapshot (via options or latest), then read manifests and build DataSplits. /// /// Time travel is resolved from table options: - /// - only one of `scan.tag-name`, `scan.snapshot-id`, `scan.timestamp-millis` may be set - /// - `scan.tag-name` → read the snapshot pinned by that tag - /// - `scan.snapshot-id` → read that specific snapshot + /// - only one of `scan.version`, `scan.timestamp-millis` may be set + /// - `scan.version` → tag name (if exists) → snapshot id (if parseable) → error /// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp /// - otherwise → read the latest snapshot /// @@ -367,19 +366,6 @@ impl<'a> TableScan<'a> { let core_options = CoreOptions::new(self.table.schema().options()); match core_options.try_time_travel_selector()? { - Some(TimeTravelSelector::TagName(tag_name)) => { - let tag_manager = TagManager::new(file_io.clone(), table_path.to_string()); - match tag_manager.get(tag_name).await? { - Some(s) => Ok(Some(s)), - None => Err(Error::DataInvalid { - message: format!("Tag '{tag_name}' doesn't exist."), - source: None, - }), - } - } - Some(TimeTravelSelector::SnapshotId(id)) => { - snapshot_manager.get_snapshot(id).await.map(Some) - } Some(TimeTravelSelector::TimestampMillis(ts)) => { match snapshot_manager.earlier_or_equal_time_mills(ts).await? { Some(s) => Ok(Some(s)), @@ -389,6 +375,26 @@ impl<'a> TableScan<'a> { }), } } + Some(TimeTravelSelector::Version(v)) => { + // Tag first, then snapshot id, else error. + let tag_manager = TagManager::new(file_io.clone(), table_path.to_string()); + if tag_manager.tag_exists(v).await? { + match tag_manager.get(v).await? { + Some(s) => Ok(Some(s)), + None => Err(Error::DataInvalid { + message: format!("Tag '{v}' doesn't exist."), + source: None, + }), + } + } else if let Ok(id) = v.parse::() { + snapshot_manager.get_snapshot(id).await.map(Some) + } else { + Err(Error::DataInvalid { + message: format!("Version '{v}' is not a valid tag name or snapshot id."), + source: None, + }) + } + } None => snapshot_manager.get_latest_snapshot().await, } } diff --git a/crates/test_utils.rs b/crates/test_utils.rs index cc881921..4813cc53 100644 --- a/crates/test_utils.rs +++ b/crates/test_utils.rs @@ -45,7 +45,7 @@ pub(crate) fn write_int_parquet_file( let props = max_row_group_size.map(|size| { WriterProperties::builder() - .set_max_row_group_size(size) + .set_max_row_group_row_count(Some(size)) .build() }); let file = File::create(path).unwrap(); diff --git a/docs/src/datafusion.md b/docs/src/datafusion.md index 18930b64..df8c4b8a 100644 --- a/docs/src/datafusion.md +++ b/docs/src/datafusion.md @@ -27,7 +27,7 @@ under the License. [dependencies] paimon = "0.1.0" paimon-datafusion = "0.1.0" -datafusion = "52" +datafusion = "53" tokio = { version = "1", features = ["full"] } ``` @@ -49,41 +49,43 @@ df.show().await?; ## Time Travel -Paimon supports time travel queries to read historical data. In DataFusion, this is done via the `FOR SYSTEM_TIME AS OF` clause. +Paimon supports time travel queries to read historical data. In DataFusion, this is done via the `VERSION AS OF` and `TIMESTAMP AS OF` clauses. ### By Snapshot ID -Read data from a specific snapshot by passing an integer literal: +Read data from a specific snapshot by passing an integer: ```sql -SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1 +SELECT * FROM paimon.default.my_table VERSION AS OF 1 ``` -This sets the `scan.snapshot-id` option and reads exactly that snapshot. +This sets the `scan.version` option. At scan time, the value is resolved as a snapshot ID. -### By Timestamp +### By Tag Name -Read data as of a specific point in time by passing a timestamp string in `YYYY-MM-DD HH:MM:SS` format: +Read data from a named tag. Since `VERSION AS OF` in SQL only accepts numeric values, tag-based time travel is done via the `scan.version` table option: -```sql -SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF '2024-01-01 00:00:00' +```rust +let table = table.copy_with_options(HashMap::from([ + ("scan.version".to_string(), "my_tag".to_string()), +])); ``` -This finds the latest snapshot whose commit time is less than or equal to the given timestamp. The timestamp is interpreted in the local timezone. +At scan time, the version value is resolved by first checking if a tag with that name exists, then trying to parse it as a snapshot ID. If neither matches, an error is returned. -### By Tag Name +### By Timestamp -Read data from a named tag by passing a string that is not a timestamp: +Read data as of a specific point in time by passing a timestamp string in `YYYY-MM-DD HH:MM:SS` format: ```sql -SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 'my_tag' +SELECT * FROM paimon.default.my_table TIMESTAMP AS OF '2024-01-01 00:00:00' ``` -Tags are named snapshots created via Paimon's tag management (e.g., `CALL sys.create_tag(...)` in Spark). This is useful for pinning a stable version of the data for reproducible queries. +This finds the latest snapshot whose commit time is less than or equal to the given timestamp. The timestamp is interpreted in the local timezone. ### Enabling Time Travel Syntax -DataFusion requires the BigQuery SQL dialect to parse `FOR SYSTEM_TIME AS OF`. You also need to register the `PaimonRelationPlanner`: +DataFusion requires the Databricks SQL dialect to parse `VERSION AS OF` and `TIMESTAMP AS OF`. You also need to register the `PaimonRelationPlanner`: ```rust use std::sync::Arc; @@ -91,12 +93,12 @@ use datafusion::prelude::{SessionConfig, SessionContext}; use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner}; let config = SessionConfig::new() - .set_str("datafusion.sql_parser.dialect", "BigQuery"); + .set_str("datafusion.sql_parser.dialect", "Databricks"); let ctx = SessionContext::new_with_config(config); ctx.register_catalog("paimon", Arc::new(PaimonCatalogProvider::new(Arc::new(catalog)))); ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))?; // Now time travel queries work -let df = ctx.sql("SELECT * FROM paimon.default.my_table FOR SYSTEM_TIME AS OF 1").await?; +let df = ctx.sql("SELECT * FROM paimon.default.my_table VERSION AS OF 1").await?; ```