From 00f6e19e03e506eab7b18b746c405d1a14d5dd5d Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 11 Apr 2026 17:25:06 +0800 Subject: [PATCH] feat(datafusion): Add DDL support with PRIMARY KEY constraint syntax - Add PaimonDdlHandler for CREATE TABLE, ALTER TABLE (ADD/DROP/RENAME COLUMN, RENAME TABLE) - Add PaimonTableFactory for CREATE EXTERNAL TABLE via DataFusion TableProviderFactory - Extend PaimonCatalogProvider/SchemaProvider with CREATE/DROP SCHEMA and DROP TABLE - Add arrow_to_paimon_type and arrow_fields_to_paimon to paimon::arrow for Arrow-to-Paimon type conversion - Support PRIMARY KEY (col, ...) constraint syntax in CREATE TABLE DDL --- crates/integrations/datafusion/src/catalog.rs | 78 ++++ crates/integrations/datafusion/src/ddl.rs | 387 +++++++++++++++++ crates/integrations/datafusion/src/lib.rs | 4 + .../datafusion/src/table_factory.rs | 137 ++++++ .../datafusion/tests/ddl_tests.rs | 402 ++++++++++++++++++ crates/paimon/src/arrow/mod.rs | 324 +++++++++++++- 6 files changed, 1331 insertions(+), 1 deletion(-) create mode 100644 crates/integrations/datafusion/src/ddl.rs create mode 100644 crates/integrations/datafusion/src/table_factory.rs create mode 100644 crates/integrations/datafusion/tests/ddl_tests.rs diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index 626a47f3..be75ec44 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -18,6 +18,7 @@ //! Paimon catalog integration for DataFusion. use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -86,6 +87,50 @@ impl CatalogProvider for PaimonCatalogProvider { "paimon catalog access thread panicked", ) } + + fn register_schema( + &self, + name: &str, + _schema: Arc, + ) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let name = name.to_string(); + block_on_with_runtime( + async move { + catalog + .create_database(&name, false, HashMap::new()) + .await + .map_err(to_datafusion_error)?; + Ok(Some( + Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name)) + as Arc, + )) + }, + "paimon catalog access thread panicked", + ) + } + + fn deregister_schema( + &self, + name: &str, + cascade: bool, + ) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let name = name.to_string(); + block_on_with_runtime( + async move { + catalog + .drop_database(&name, false, cascade) + .await + .map_err(to_datafusion_error)?; + Ok(Some( + Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name)) + as Arc, + )) + }, + "paimon catalog access thread panicked", + ) + } } /// Represents a [`SchemaProvider`] for the Paimon [`Catalog`], managing @@ -159,4 +204,37 @@ impl SchemaProvider for PaimonSchemaProvider { "paimon catalog access thread panicked", ) } + + fn register_table( + &self, + _name: String, + table: Arc, + ) -> DFResult>> { + // The table is already created in the Paimon catalog by PaimonTableFactory. + // DataFusion calls register_table after the factory returns, so we just + // acknowledge it here. + Ok(Some(table)) + } + + fn deregister_table(&self, name: &str) -> DFResult>> { + let catalog = Arc::clone(&self.catalog); + let identifier = Identifier::new(self.database.clone(), name); + block_on_with_runtime( + async move { + // Try to get the table first so we can return it. + let table = match catalog.get_table(&identifier).await { + Ok(t) => t, + Err(paimon::Error::TableNotExist { .. }) => return Ok(None), + Err(e) => return Err(to_datafusion_error(e)), + }; + let provider = PaimonTableProvider::try_new(table)?; + catalog + .drop_table(&identifier, false) + .await + .map_err(to_datafusion_error)?; + Ok(Some(Arc::new(provider) as Arc)) + }, + "paimon catalog access thread panicked", + ) + } } diff --git a/crates/integrations/datafusion/src/ddl.rs b/crates/integrations/datafusion/src/ddl.rs new file mode 100644 index 00000000..02a319f4 --- /dev/null +++ b/crates/integrations/datafusion/src/ddl.rs @@ -0,0 +1,387 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DDL support for Paimon tables. +//! +//! DataFusion does not natively support all DDL statements needed by Paimon. +//! This module provides [`PaimonDdlHandler`] which intercepts CREATE TABLE and +//! ALTER TABLE SQL, translates them to Paimon catalog operations, and delegates +//! everything else (SELECT, CREATE/DROP SCHEMA, DROP TABLE, etc.) to the +//! underlying [`SessionContext`]. +//! +//! Supported DDL: +//! - `CREATE TABLE db.t (col TYPE, ..., PRIMARY KEY (col, ...)) [PARTITIONED BY (col TYPE, ...)] [WITH ('key' = 'val')]` +//! - `ALTER TABLE db.t ADD COLUMN col TYPE` +//! - `ALTER TABLE db.t DROP COLUMN col` +//! - `ALTER TABLE db.t RENAME COLUMN old TO new` +//! - `ALTER TABLE db.t RENAME TO new_name` + +use std::sync::Arc; + +use datafusion::arrow::array::StringArray; +use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::{DataFrame, SessionContext}; +use datafusion::sql::sqlparser::ast::{ + AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, HiveDistributionStyle, + ObjectName, RenameTableNameKind, SqlOption, Statement, +}; +use datafusion::sql::sqlparser::dialect::GenericDialect; +use datafusion::sql::sqlparser::parser::Parser; +use paimon::catalog::{Catalog, Identifier}; +use paimon::spec::SchemaChange; + +use crate::error::to_datafusion_error; +use paimon::arrow::arrow_to_paimon_type; + +/// Wraps a [`SessionContext`] and a Paimon [`Catalog`] to handle DDL statements +/// that DataFusion does not natively support (e.g. ALTER TABLE). +/// +/// For all other SQL, it delegates to the inner `SessionContext`. +/// +/// # Example +/// ```ignore +/// let handler = PaimonDdlHandler::new(ctx, catalog); +/// let df = handler.sql("ALTER TABLE paimon.db.t ADD COLUMN age INT").await?; +/// ``` +pub struct PaimonDdlHandler { + ctx: SessionContext, + catalog: Arc, + /// The catalog name registered in the SessionContext (used to strip the catalog prefix). + catalog_name: String, +} + +impl PaimonDdlHandler { + pub fn new( + ctx: SessionContext, + catalog: Arc, + catalog_name: impl Into, + ) -> Self { + Self { + ctx, + catalog, + catalog_name: catalog_name.into(), + } + } + + /// Returns a reference to the inner [`SessionContext`]. + pub fn ctx(&self) -> &SessionContext { + &self.ctx + } + + /// Execute a SQL statement. ALTER TABLE is handled by Paimon directly; + /// everything else is delegated to DataFusion. + pub async fn sql(&self, sql: &str) -> DFResult { + let dialect = GenericDialect {}; + let statements = Parser::parse_sql(&dialect, sql) + .map_err(|e| DataFusionError::Plan(format!("SQL parse error: {e}")))?; + + if statements.len() != 1 { + return Err(DataFusionError::Plan( + "Expected exactly one SQL statement".to_string(), + )); + } + + match &statements[0] { + Statement::CreateTable(create_table) => self.handle_create_table(create_table).await, + Statement::AlterTable { + name, operations, .. + } => self.handle_alter_table(name, operations).await, + _ => self.ctx.sql(sql).await, + } + } + + async fn handle_create_table(&self, ct: &CreateTable) -> DFResult { + if ct.location.is_some() { + return Err(DataFusionError::Plan( + "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), + )); + } + if ct.query.is_some() { + return Err(DataFusionError::Plan( + "CREATE TABLE AS SELECT is not yet supported for Paimon tables.".to_string(), + )); + } + + let identifier = self.resolve_table_name(&ct.name)?; + + let mut builder = paimon::spec::Schema::builder(); + + // Columns + for col in &ct.columns { + let arrow_type = sql_data_type_to_arrow(&col.data_type)?; + let nullable = !col.options.iter().any(|opt| { + matches!( + opt.option, + datafusion::sql::sqlparser::ast::ColumnOption::NotNull + ) + }); + let paimon_type = + arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + builder = builder.column(col.name.value.clone(), paimon_type); + } + + // Primary key from constraints: PRIMARY KEY (col, ...) + for constraint in &ct.constraints { + if let datafusion::sql::sqlparser::ast::TableConstraint::PrimaryKey { + columns, .. + } = constraint + { + let pk_cols: Vec = + columns.iter().map(|c| c.column.expr.to_string()).collect(); + builder = builder.primary_key(pk_cols); + } + } + + // Partition keys from PARTITIONED BY (col, ...) + if let HiveDistributionStyle::PARTITIONED { columns } = &ct.hive_distribution { + let partition_keys: Vec = + columns.iter().map(|c| c.name.value.clone()).collect(); + builder = builder.partition_keys(partition_keys); + } + + // Table options from WITH ('key' = 'value', ...) + for (k, v) in extract_options(&ct.table_options)? { + builder = builder.option(k, v); + } + + let schema = builder.build().map_err(to_datafusion_error)?; + + self.catalog + .create_table(&identifier, schema, ct.if_not_exists) + .await + .map_err(to_datafusion_error)?; + + ok_result(&self.ctx) + } + + async fn handle_alter_table( + &self, + name: &ObjectName, + operations: &[AlterTableOperation], + ) -> DFResult { + let identifier = self.resolve_table_name(name)?; + + let mut changes = Vec::new(); + let mut rename_to: Option = None; + + for op in operations { + match op { + AlterTableOperation::AddColumn { column_def, .. } => { + let change = column_def_to_add_column(column_def)?; + changes.push(change); + } + AlterTableOperation::DropColumn { + column_names, + if_exists: _, + .. + } => { + for col in column_names { + changes.push(SchemaChange::drop_column(col.value.clone())); + } + } + AlterTableOperation::RenameColumn { + old_column_name, + new_column_name, + } => { + changes.push(SchemaChange::rename_column( + old_column_name.value.clone(), + new_column_name.value.clone(), + )); + } + AlterTableOperation::RenameTable { table_name } => { + let new_name = match table_name { + RenameTableNameKind::To(name) | RenameTableNameKind::As(name) => { + object_name_to_string(name) + } + }; + rename_to = Some(Identifier::new(identifier.database().to_string(), new_name)); + } + other => { + return Err(DataFusionError::Plan(format!( + "Unsupported ALTER TABLE operation: {other}" + ))); + } + } + } + + if let Some(new_identifier) = rename_to { + self.catalog + .rename_table(&identifier, &new_identifier, false) + .await + .map_err(to_datafusion_error)?; + } + + if !changes.is_empty() { + self.catalog + .alter_table(&identifier, changes, false) + .await + .map_err(to_datafusion_error)?; + } + + ok_result(&self.ctx) + } + + /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon Identifier. + fn resolve_table_name(&self, name: &ObjectName) -> DFResult { + let parts: Vec = name + .0 + .iter() + .filter_map(|p| p.as_ident().map(|id| id.value.clone())) + .collect(); + match parts.len() { + 3 => { + // catalog.database.table — strip catalog prefix + if parts[0] != self.catalog_name { + return Err(DataFusionError::Plan(format!( + "Unknown catalog '{}', expected '{}'", + parts[0], self.catalog_name + ))); + } + Ok(Identifier::new(parts[1].clone(), parts[2].clone())) + } + 2 => Ok(Identifier::new(parts[0].clone(), parts[1].clone())), + 1 => Err(DataFusionError::Plan(format!( + "ALTER TABLE requires at least database.table, got: {}", + parts[0] + ))), + _ => Err(DataFusionError::Plan(format!( + "Invalid table reference: {name}" + ))), + } + } +} + +/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`]. +fn column_def_to_add_column(col: &ColumnDef) -> DFResult { + let arrow_type = sql_data_type_to_arrow(&col.data_type)?; + let nullable = !col.options.iter().any(|opt| { + matches!( + opt.option, + datafusion::sql::sqlparser::ast::ColumnOption::NotNull + ) + }); + let paimon_type = arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?; + Ok(SchemaChange::add_column( + col.name.value.clone(), + paimon_type, + )) +} + +/// Minimal conversion from sqlparser SQL data types to Arrow data types. +fn sql_data_type_to_arrow( + sql_type: &datafusion::sql::sqlparser::ast::DataType, +) -> DFResult { + use datafusion::sql::sqlparser::ast::DataType as SqlType; + match sql_type { + SqlType::Boolean => Ok(ArrowDataType::Boolean), + SqlType::TinyInt(_) => Ok(ArrowDataType::Int8), + SqlType::SmallInt(_) => Ok(ArrowDataType::Int16), + SqlType::Int(_) | SqlType::Integer(_) => Ok(ArrowDataType::Int32), + SqlType::BigInt(_) => Ok(ArrowDataType::Int64), + SqlType::Float(_) => Ok(ArrowDataType::Float32), + SqlType::Real => Ok(ArrowDataType::Float32), + SqlType::Double(_) | SqlType::DoublePrecision => Ok(ArrowDataType::Float64), + SqlType::Varchar(_) | SqlType::CharVarying(_) | SqlType::Text | SqlType::String(_) => { + Ok(ArrowDataType::Utf8) + } + SqlType::Char(_) | SqlType::Character(_) => Ok(ArrowDataType::Utf8), + SqlType::Binary(_) | SqlType::Varbinary(_) | SqlType::Blob(_) | SqlType::Bytea => { + Ok(ArrowDataType::Binary) + } + SqlType::Date => Ok(ArrowDataType::Date32), + SqlType::Timestamp(precision, tz_info) => { + use datafusion::sql::sqlparser::ast::TimezoneInfo; + let unit = match precision { + Some(0) => datafusion::arrow::datatypes::TimeUnit::Second, + Some(1..=3) | None => datafusion::arrow::datatypes::TimeUnit::Millisecond, + Some(4..=6) => datafusion::arrow::datatypes::TimeUnit::Microsecond, + _ => datafusion::arrow::datatypes::TimeUnit::Nanosecond, + }; + let tz = match tz_info { + TimezoneInfo::None | TimezoneInfo::WithoutTimeZone => None, + _ => Some("UTC".into()), + }; + Ok(ArrowDataType::Timestamp(unit, tz)) + } + SqlType::Decimal(info) => { + use datafusion::sql::sqlparser::ast::ExactNumberInfo; + let (p, s) = match info { + ExactNumberInfo::PrecisionAndScale(p, s) => (*p as u8, *s as i8), + ExactNumberInfo::Precision(p) => (*p as u8, 0), + ExactNumberInfo::None => (10, 0), + }; + Ok(ArrowDataType::Decimal128(p, s)) + } + _ => Err(DataFusionError::Plan(format!( + "Unsupported SQL data type for ALTER TABLE: {sql_type}" + ))), + } +} + +fn object_name_to_string(name: &ObjectName) -> String { + name.0 + .iter() + .filter_map(|p| p.as_ident().map(|id| id.value.clone())) + .collect::>() + .join(".") +} + +/// Extract key-value pairs from [`CreateTableOptions`]. +fn extract_options(opts: &CreateTableOptions) -> DFResult> { + let sql_options = match opts { + CreateTableOptions::With(options) + | CreateTableOptions::Options(options) + | CreateTableOptions::TableProperties(options) + | CreateTableOptions::Plain(options) => options, + CreateTableOptions::None => return Ok(Vec::new()), + }; + sql_options + .iter() + .map(|opt| match opt { + SqlOption::KeyValue { key, value } => { + let v = value.to_string(); + // Strip surrounding quotes from the value if present. + let v = v + .strip_prefix('\'') + .and_then(|s| s.strip_suffix('\'')) + .unwrap_or(&v) + .to_string(); + Ok((key.value.clone(), v)) + } + other => Err(DataFusionError::Plan(format!( + "Unsupported table option: {other}" + ))), + }) + .collect() +} + +/// Return an empty DataFrame with a single "result" column containing "OK". +fn ok_result(ctx: &SessionContext) -> DFResult { + let schema = Arc::new(Schema::new(vec![Field::new( + "result", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["OK"]))], + )?; + let df = ctx.read_batch(batch)?; + Ok(df) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index abcf7448..d48b809b 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -37,6 +37,7 @@ //! translatable partition-only conjuncts from DataFusion filters. mod catalog; +mod ddl; mod error; mod filter_pushdown; #[cfg(feature = "fulltext")] @@ -45,11 +46,14 @@ mod physical_plan; mod relation_planner; pub mod runtime; mod table; +mod table_factory; pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; +pub use ddl::PaimonDdlHandler; pub use error::to_datafusion_error; #[cfg(feature = "fulltext")] pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; pub use physical_plan::PaimonTableScan; pub use relation_planner::PaimonRelationPlanner; pub use table::PaimonTableProvider; +pub use table_factory::PaimonTableFactory; diff --git a/crates/integrations/datafusion/src/table_factory.rs b/crates/integrations/datafusion/src/table_factory.rs new file mode 100644 index 00000000..d6e12c62 --- /dev/null +++ b/crates/integrations/datafusion/src/table_factory.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`TableProviderFactory`] implementation for creating Paimon tables via +//! `CREATE EXTERNAL TABLE`. + +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::catalog::{Session, TableProvider, TableProviderFactory}; +use datafusion::common::TableReference; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::logical_expr::CreateExternalTable; +use paimon::catalog::{Catalog, Identifier}; +use paimon::spec::Schema; + +use crate::error::to_datafusion_error; +use crate::table::PaimonTableProvider; +use paimon::arrow::arrow_to_paimon_type; + +/// A [`TableProviderFactory`] that creates Paimon tables. +/// +/// Register with: +/// ```ignore +/// ctx.state_mut().table_factories_mut() +/// .insert("PAIMON".to_string(), Arc::new(PaimonTableFactory::new(catalog))); +/// ``` +/// +/// Then use: +/// ```sql +/// CREATE EXTERNAL TABLE paimon.my_db.my_table ( +/// id INT NOT NULL, +/// name STRING, +/// dt STRING, +/// PRIMARY KEY (id, dt) +/// ) PARTITIONED BY (dt) +/// WITH ('bucket' = '2'); +/// ``` +pub struct PaimonTableFactory { + catalog: Arc, +} + +impl Debug for PaimonTableFactory { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PaimonTableFactory").finish() + } +} + +impl PaimonTableFactory { + pub fn new(catalog: Arc) -> Self { + Self { catalog } + } +} + +#[async_trait] +impl TableProviderFactory for PaimonTableFactory { + async fn create( + &self, + _state: &dyn Session, + cmd: &CreateExternalTable, + ) -> DFResult> { + if !cmd.location.is_empty() { + return Err(DataFusionError::Plan( + "LOCATION is not supported for Paimon tables. Table path is determined by the catalog warehouse.".to_string(), + )); + } + + let identifier = resolve_identifier(&cmd.name)?; + + // Build Paimon schema from the CREATE EXTERNAL TABLE command. + let arrow_schema = cmd.schema.as_arrow(); + let mut builder = Schema::builder(); + + for field in arrow_schema.fields() { + let paimon_type = arrow_to_paimon_type(field.data_type(), field.is_nullable()) + .map_err(to_datafusion_error)?; + builder = builder.column(field.name().clone(), paimon_type); + } + + if !cmd.table_partition_cols.is_empty() { + builder = builder.partition_keys(cmd.table_partition_cols.clone()); + } + + // Pass all OPTIONS through to Paimon (includes 'bucket', etc.). + // DataFusion prefixes options with "format." — strip that prefix for Paimon. + for (k, v) in &cmd.options { + let key = k.strip_prefix("format.").unwrap_or(k); + builder = builder.option(key.to_string(), v.clone()); + } + + let schema = builder.build().map_err(to_datafusion_error)?; + + self.catalog + .create_table(&identifier, schema, cmd.if_not_exists) + .await + .map_err(to_datafusion_error)?; + + // Return the newly created table as a provider. + let table = self + .catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error)?; + let provider = PaimonTableProvider::try_new(table)?; + Ok(Arc::new(provider)) + } +} + +/// Extract a Paimon [`Identifier`] (database, table) from a DataFusion [`TableReference`]. +fn resolve_identifier(name: &TableReference) -> DFResult { + match name { + TableReference::Full { + schema, table, .. + } => Ok(Identifier::new(schema.to_string(), table.to_string())), + TableReference::Partial { schema, table } => { + Ok(Identifier::new(schema.to_string(), table.to_string())) + } + TableReference::Bare { table } => Err(DataFusionError::Plan(format!( + "CREATE EXTERNAL TABLE requires a fully qualified name (catalog.database.table or database.table), got: {table}" + ))), + } +} diff --git a/crates/integrations/datafusion/tests/ddl_tests.rs b/crates/integrations/datafusion/tests/ddl_tests.rs new file mode 100644 index 00000000..207583d6 --- /dev/null +++ b/crates/integrations/datafusion/tests/ddl_tests.rs @@ -0,0 +1,402 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DDL integration tests for paimon-datafusion. + +use std::sync::Arc; + +use datafusion::catalog::CatalogProvider; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; +use paimon_datafusion::{PaimonCatalogProvider, PaimonDdlHandler, PaimonRelationPlanner}; +use tempfile::TempDir; + +fn create_test_env() -> (TempDir, Arc) { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let warehouse = format!("file://{}", temp_dir.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = FileSystemCatalog::new(options).expect("Failed to create catalog"); + (temp_dir, Arc::new(catalog)) +} + +fn create_handler(catalog: Arc) -> PaimonDdlHandler { + let ctx = SessionContext::new(); + ctx.register_catalog( + "paimon", + Arc::new(PaimonCatalogProvider::new(catalog.clone())), + ); + ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new())) + .expect("Failed to register relation planner"); + PaimonDdlHandler::new(ctx, catalog, "paimon") +} + +// ======================= CREATE / DROP SCHEMA ======================= + +#[tokio::test] +async fn test_create_schema() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA should succeed"); + + let databases = catalog.list_databases().await.unwrap(); + assert!( + databases.contains(&"test_db".to_string()), + "Database test_db should exist after CREATE SCHEMA" + ); +} + +#[tokio::test] +async fn test_drop_schema() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("drop_me", false, Default::default()) + .await + .unwrap(); + + handler + .sql("DROP SCHEMA paimon.drop_me CASCADE") + .await + .expect("DROP SCHEMA should succeed"); + + let databases = catalog.list_databases().await.unwrap(); + assert!( + !databases.contains(&"drop_me".to_string()), + "Database drop_me should not exist after DROP SCHEMA" + ); +} + +#[tokio::test] +async fn test_schema_names_via_catalog_provider() { + let (_tmp, catalog) = create_test_env(); + let provider = PaimonCatalogProvider::new(catalog.clone()); + + catalog + .create_database("db_a", false, Default::default()) + .await + .unwrap(); + catalog + .create_database("db_b", false, Default::default()) + .await + .unwrap(); + + let names = provider.schema_names(); + assert!(names.contains(&"db_a".to_string())); + assert!(names.contains(&"db_b".to_string())); +} + +// ======================= CREATE TABLE ======================= + +#[tokio::test] +async fn test_create_table() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.users ( + id INT NOT NULL, + name STRING, + age INT, + PRIMARY KEY (id) + )", + ) + .await + .expect("CREATE TABLE should succeed"); + + let tables = catalog.list_tables("mydb").await.unwrap(); + assert!( + tables.contains(&"users".to_string()), + "Table users should exist after CREATE TABLE" + ); + + // Verify schema + let table = catalog + .get_table(&Identifier::new("mydb", "users")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.primary_keys(), &["id"]); +} + +#[tokio::test] +async fn test_create_table_with_partition() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + handler + .sql( + "CREATE TABLE paimon.mydb.events ( + id INT NOT NULL, + name STRING, + dt STRING, + PRIMARY KEY (id, dt) + ) PARTITIONED BY (dt STRING) + WITH ('bucket' = '2')", + ) + .await + .expect("CREATE TABLE with partition should succeed"); + + let table = catalog + .get_table(&Identifier::new("mydb", "events")) + .await + .unwrap(); + let schema = table.schema(); + assert_eq!(schema.partition_keys(), &["dt"]); + assert_eq!(schema.primary_keys(), &["id", "dt"]); + assert_eq!( + schema.options().get("bucket"), + Some(&"2".to_string()), + "Table option 'bucket' should be preserved" + ); +} + +#[tokio::test] +async fn test_create_table_if_not_exists() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let sql = "CREATE TABLE IF NOT EXISTS paimon.mydb.t1 ( + id INT NOT NULL + )"; + + // First create should succeed + handler.sql(sql).await.expect("First CREATE should succeed"); + + // Second create with IF NOT EXISTS should also succeed + handler + .sql(sql) + .await + .expect("Second CREATE with IF NOT EXISTS should succeed"); +} + +#[tokio::test] +async fn test_create_table_with_location_rejected() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + // EXTERNAL TABLE with LOCATION is the only way sqlparser populates the location field + let result = handler + .sql( + "CREATE EXTERNAL TABLE paimon.mydb.bad ( + id INT NOT NULL + ) STORED AS PARQUET + LOCATION '/some/path'", + ) + .await; + + assert!( + result.is_err(), + "LOCATION should be rejected for Paimon tables" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("LOCATION is not supported"), + "Error should mention LOCATION is not supported, got: {err_msg}" + ); +} + +// ======================= DROP TABLE ======================= + +#[tokio::test] +async fn test_drop_table() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + // Create a table first + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "to_drop"), schema, false) + .await + .unwrap(); + + assert!(catalog + .list_tables("mydb") + .await + .unwrap() + .contains(&"to_drop".to_string())); + + handler + .sql("DROP TABLE paimon.mydb.to_drop") + .await + .expect("DROP TABLE should succeed"); + + assert!( + !catalog + .list_tables("mydb") + .await + .unwrap() + .contains(&"to_drop".to_string()), + "Table should not exist after DROP TABLE" + ); +} + +// ======================= ALTER TABLE ======================= + +#[tokio::test] +async fn test_alter_table_add_column() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .column( + "name", + paimon::spec::DataType::VarChar(paimon::spec::VarCharType::string_type()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "alter_test"), schema, false) + .await + .unwrap(); + + // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect an error + let result = handler + .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT") + .await; + + // FileSystemCatalog returns Unsupported for alter_table, which is expected + assert!( + result.is_err(), + "ALTER TABLE should fail because FileSystemCatalog does not implement alter_table yet" + ); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("not yet implemented") || err_msg.contains("Unsupported"), + "Error should indicate alter_table is not implemented, got: {err_msg}" + ); +} + +#[tokio::test] +async fn test_alter_table_rename() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "old_name"), schema, false) + .await + .unwrap(); + + handler + .sql("ALTER TABLE mydb.old_name RENAME TO new_name") + .await + .expect("ALTER TABLE RENAME should succeed"); + + let tables = catalog.list_tables("mydb").await.unwrap(); + assert!( + !tables.contains(&"old_name".to_string()), + "old_name should not exist after rename" + ); + assert!( + tables.contains(&"new_name".to_string()), + "new_name should exist after rename" + ); +} + +#[tokio::test] +async fn test_ddl_handler_delegates_select() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + + catalog + .create_database("mydb", false, Default::default()) + .await + .unwrap(); + + let schema = paimon::spec::Schema::builder() + .column( + "id", + paimon::spec::DataType::Int(paimon::spec::IntType::new()), + ) + .build() + .unwrap(); + catalog + .create_table(&Identifier::new("mydb", "t1"), schema, false) + .await + .unwrap(); + + // SELECT should be delegated to DataFusion + let df = handler + .sql("SELECT * FROM paimon.mydb.t1") + .await + .expect("SELECT should be delegated to DataFusion"); + + let batches = df.collect().await.expect("SELECT should execute"); + // Empty table, but should succeed + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0, "Empty table should return 0 rows"); +} diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs index f48b9da3..e2f60fc4 100644 --- a/crates/paimon/src/arrow/mod.rs +++ b/crates/paimon/src/arrow/mod.rs @@ -22,7 +22,11 @@ pub(crate) mod schema_evolution; pub use crate::arrow::reader::ArrowReaderBuilder; -use crate::spec::{DataField, DataType as PaimonDataType}; +use crate::spec::{ + ArrayType, BigIntType, BooleanType, DataField, DataType as PaimonDataType, DateType, + DecimalType, DoubleType, FloatType, IntType, LocalZonedTimestampType, MapType, RowType, + SmallIntType, TimeType, TimestampType, TinyIntType, VarBinaryType, VarCharType, +}; use arrow_schema::DataType as ArrowDataType; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema, TimeUnit}; use std::sync::Arc; @@ -128,6 +132,114 @@ fn timestamp_time_unit(precision: u32) -> crate::Result { } } +/// Convert an Arrow [`DataType`](ArrowDataType) to a Paimon [`DataType`](PaimonDataType). +pub fn arrow_to_paimon_type( + arrow_type: &ArrowDataType, + nullable: bool, +) -> crate::Result { + match arrow_type { + ArrowDataType::Boolean => Ok(PaimonDataType::Boolean(BooleanType::with_nullable( + nullable, + ))), + ArrowDataType::Int8 => Ok(PaimonDataType::TinyInt(TinyIntType::with_nullable( + nullable, + ))), + ArrowDataType::Int16 => Ok(PaimonDataType::SmallInt(SmallIntType::with_nullable( + nullable, + ))), + ArrowDataType::Int32 => Ok(PaimonDataType::Int(IntType::with_nullable(nullable))), + ArrowDataType::Int64 => Ok(PaimonDataType::BigInt(BigIntType::with_nullable(nullable))), + ArrowDataType::Float32 => Ok(PaimonDataType::Float(FloatType::with_nullable(nullable))), + ArrowDataType::Float64 => Ok(PaimonDataType::Double(DoubleType::with_nullable(nullable))), + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Utf8View => { + Ok(PaimonDataType::VarChar(VarCharType::with_nullable( + nullable, + VarCharType::MAX_LENGTH, + )?)) + } + ArrowDataType::Binary | ArrowDataType::LargeBinary | ArrowDataType::BinaryView => Ok( + PaimonDataType::VarBinary(VarBinaryType::try_new(nullable, VarBinaryType::MAX_LENGTH)?), + ), + ArrowDataType::Date32 => Ok(PaimonDataType::Date(DateType::with_nullable(nullable))), + ArrowDataType::Timestamp(unit, tz) => { + let precision = match unit { + TimeUnit::Second => 0, + TimeUnit::Millisecond => 3, + TimeUnit::Microsecond => 6, + TimeUnit::Nanosecond => 9, + }; + if tz.is_some() { + Ok(PaimonDataType::LocalZonedTimestamp( + LocalZonedTimestampType::with_nullable(nullable, precision)?, + )) + } else { + Ok(PaimonDataType::Timestamp(TimestampType::with_nullable( + nullable, precision, + )?)) + } + } + ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => { + let precision = match arrow_type { + ArrowDataType::Time32(TimeUnit::Second) => 0, + ArrowDataType::Time32(TimeUnit::Millisecond) => 3, + ArrowDataType::Time64(TimeUnit::Microsecond) => 6, + ArrowDataType::Time64(TimeUnit::Nanosecond) => 9, + _ => 0, + }; + Ok(PaimonDataType::Time(TimeType::with_nullable( + nullable, precision, + )?)) + } + ArrowDataType::Decimal128(p, s) => Ok(PaimonDataType::Decimal(DecimalType::with_nullable( + nullable, *p as u32, *s as u32, + )?)), + ArrowDataType::List(field) | ArrowDataType::LargeList(field) => { + let element = arrow_to_paimon_type(field.data_type(), field.is_nullable())?; + Ok(PaimonDataType::Array(ArrayType::with_nullable( + nullable, element, + ))) + } + ArrowDataType::Map(entries_field, _) => { + if let ArrowDataType::Struct(fields) = entries_field.data_type() { + if fields.len() == 2 { + let key = arrow_to_paimon_type(fields[0].data_type(), fields[0].is_nullable())?; + let value = + arrow_to_paimon_type(fields[1].data_type(), fields[1].is_nullable())?; + return Ok(PaimonDataType::Map(MapType::with_nullable( + nullable, key, value, + ))); + } + } + Err(crate::Error::Unsupported { + message: format!("Unsupported Map structure: {arrow_type:?}"), + }) + } + ArrowDataType::Struct(fields) => { + let field_slice: Vec = fields.iter().map(|f| f.as_ref().clone()).collect(); + let paimon_fields = arrow_fields_to_paimon(&field_slice)?; + Ok(PaimonDataType::Row(RowType::with_nullable( + nullable, + paimon_fields, + ))) + } + _ => Err(crate::Error::Unsupported { + message: format!("Unsupported Arrow type for Paimon conversion: {arrow_type:?}"), + }), + } +} + +/// Convert Arrow fields to Paimon [`DataField`]s with auto-assigned IDs starting from 0. +pub fn arrow_fields_to_paimon(fields: &[ArrowField]) -> crate::Result> { + fields + .iter() + .enumerate() + .map(|(i, f)| { + let paimon_type = arrow_to_paimon_type(f.data_type(), f.is_nullable())?; + Ok(DataField::new(i as i32, f.name().clone(), paimon_type)) + }) + .collect() +} + /// Build an Arrow [`Schema`](ArrowSchema) from Paimon [`DataField`]s. pub fn build_target_arrow_schema(fields: &[DataField]) -> crate::Result> { let arrow_fields: Vec = fields @@ -143,3 +255,213 @@ pub fn build_target_arrow_schema(fields: &[DataField]) -> crate::Result>>()?; Ok(Arc::new(ArrowSchema::new(arrow_fields))) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::*; + + /// Helper: paimon -> arrow -> paimon roundtrip, assert the arrow type matches expected. + fn assert_paimon_to_arrow(paimon: &PaimonDataType, expected_arrow: &ArrowDataType) { + let arrow = paimon_type_to_arrow(paimon).unwrap(); + assert_eq!(&arrow, expected_arrow, "paimon_type_to_arrow mismatch"); + } + + /// Helper: arrow -> paimon, assert the paimon type variant matches. + fn assert_arrow_to_paimon( + arrow: &ArrowDataType, + nullable: bool, + expected_paimon: &PaimonDataType, + ) { + let paimon = arrow_to_paimon_type(arrow, nullable).unwrap(); + assert_eq!(&paimon, expected_paimon, "arrow_to_paimon_type mismatch"); + } + + #[test] + fn test_primitive_roundtrip() { + let cases: Vec<(PaimonDataType, ArrowDataType)> = vec![ + ( + PaimonDataType::Boolean(BooleanType::new()), + ArrowDataType::Boolean, + ), + ( + PaimonDataType::TinyInt(TinyIntType::new()), + ArrowDataType::Int8, + ), + ( + PaimonDataType::SmallInt(SmallIntType::new()), + ArrowDataType::Int16, + ), + (PaimonDataType::Int(IntType::new()), ArrowDataType::Int32), + ( + PaimonDataType::BigInt(BigIntType::new()), + ArrowDataType::Int64, + ), + ( + PaimonDataType::Float(FloatType::new()), + ArrowDataType::Float32, + ), + ( + PaimonDataType::Double(DoubleType::new()), + ArrowDataType::Float64, + ), + (PaimonDataType::Date(DateType::new()), ArrowDataType::Date32), + ]; + for (paimon, arrow) in &cases { + assert_paimon_to_arrow(paimon, arrow); + assert_arrow_to_paimon(arrow, true, paimon); + } + } + + #[test] + fn test_string_types() { + let varchar = PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()); + assert_paimon_to_arrow(&varchar, &ArrowDataType::Utf8); + + // All string-like arrow types map to VarChar + for arrow in &[ + ArrowDataType::Utf8, + ArrowDataType::LargeUtf8, + ArrowDataType::Utf8View, + ] { + assert_arrow_to_paimon(arrow, true, &varchar); + } + } + + #[test] + fn test_binary_types() { + let varbinary = PaimonDataType::VarBinary( + VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(), + ); + assert_paimon_to_arrow(&varbinary, &ArrowDataType::Binary); + + for arrow in &[ + ArrowDataType::Binary, + ArrowDataType::LargeBinary, + ArrowDataType::BinaryView, + ] { + assert_arrow_to_paimon(arrow, true, &varbinary); + } + } + + #[test] + fn test_timestamp_roundtrip() { + // millisecond precision + let ts3 = PaimonDataType::Timestamp(TimestampType::new(3).unwrap()); + assert_paimon_to_arrow(&ts3, &ArrowDataType::Timestamp(TimeUnit::Millisecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Millisecond, None), + true, + &ts3, + ); + + // microsecond precision + let ts6 = PaimonDataType::Timestamp(TimestampType::new(6).unwrap()); + assert_paimon_to_arrow(&ts6, &ArrowDataType::Timestamp(TimeUnit::Microsecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + &ts6, + ); + + // nanosecond precision + let ts9 = PaimonDataType::Timestamp(TimestampType::new(9).unwrap()); + assert_paimon_to_arrow(&ts9, &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)); + assert_arrow_to_paimon( + &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), + true, + &ts9, + ); + } + + #[test] + fn test_local_zoned_timestamp() { + let lzts = PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap()); + let arrow = ArrowDataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())); + assert_paimon_to_arrow(&lzts, &arrow); + assert_arrow_to_paimon(&arrow, true, &lzts); + } + + #[test] + fn test_decimal_roundtrip() { + let dec = PaimonDataType::Decimal(DecimalType::new(10, 2).unwrap()); + assert_paimon_to_arrow(&dec, &ArrowDataType::Decimal128(10, 2)); + assert_arrow_to_paimon(&ArrowDataType::Decimal128(10, 2), true, &dec); + } + + #[test] + fn test_array_roundtrip() { + let paimon_arr = PaimonDataType::Array(ArrayType::new(PaimonDataType::Int(IntType::new()))); + let arrow_list = ArrowDataType::List(Arc::new(ArrowField::new( + "element", + ArrowDataType::Int32, + true, + ))); + assert_paimon_to_arrow(&paimon_arr, &arrow_list); + + // arrow -> paimon: element field name doesn't matter + let arrow_list2 = ArrowDataType::List(Arc::new(ArrowField::new( + "item", + ArrowDataType::Int32, + true, + ))); + let result = arrow_to_paimon_type(&arrow_list2, true).unwrap(); + assert!(matches!(result, PaimonDataType::Array(_))); + } + + #[test] + fn test_map_roundtrip() { + let paimon_map = PaimonDataType::Map(MapType::new( + PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()), + PaimonDataType::Int(IntType::new()), + )); + let arrow_map = paimon_type_to_arrow(&paimon_map).unwrap(); + let back = arrow_to_paimon_type(&arrow_map, true).unwrap(); + assert!(matches!(back, PaimonDataType::Map(_))); + } + + #[test] + fn test_row_roundtrip() { + let row = PaimonDataType::Row(RowType::new(vec![ + DataField::new(0, "a".to_string(), PaimonDataType::Int(IntType::new())), + DataField::new( + 1, + "b".to_string(), + PaimonDataType::VarChar(VarCharType::new(VarCharType::MAX_LENGTH).unwrap()), + ), + ])); + let arrow = paimon_type_to_arrow(&row).unwrap(); + let back = arrow_to_paimon_type(&arrow, true).unwrap(); + assert!(matches!(back, PaimonDataType::Row(_))); + } + + #[test] + fn test_not_nullable() { + let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, false).unwrap(); + assert!(!paimon.is_nullable()); + + let paimon = arrow_to_paimon_type(&ArrowDataType::Int32, true).unwrap(); + assert!(paimon.is_nullable()); + } + + #[test] + fn test_unsupported_arrow_type() { + let result = arrow_to_paimon_type(&ArrowDataType::Duration(TimeUnit::Second), true); + assert!(result.is_err()); + } + + #[test] + fn test_arrow_fields_to_paimon_ids() { + let fields = vec![ + ArrowField::new("x", ArrowDataType::Int32, true), + ArrowField::new("y", ArrowDataType::Utf8, false), + ]; + let paimon_fields = arrow_fields_to_paimon(&fields).unwrap(); + assert_eq!(paimon_fields.len(), 2); + assert_eq!(paimon_fields[0].id(), 0); + assert_eq!(paimon_fields[0].name(), "x"); + assert_eq!(paimon_fields[1].id(), 1); + assert_eq!(paimon_fields[1].name(), "y"); + assert!(!paimon_fields[1].data_type().is_nullable()); + } +}