Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ mod tests {
use paimon::spec::{
BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema,
};
use paimon::table::Table;
use std::fs;
use tempfile::tempdir;
use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
Expand Down Expand Up @@ -298,6 +299,7 @@ mod tests {
Identifier::new("test_db", "test_table"),
"/tmp/test-table".to_string(),
table_schema,
None,
)
}

Expand Down Expand Up @@ -329,6 +331,7 @@ mod tests {
Identifier::new("default", "t"),
table_path,
table_schema,
None,
);

let split = paimon::DataSplitBuilder::new()
Expand Down
12 changes: 12 additions & 0 deletions crates/paimon/src/api/resource_paths.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ impl ResourcePaths {
pub fn rename_table(&self) -> String {
format!("{}/{}/rename", self.base_path, Self::TABLES)
}

/// Get the commit table endpoint path.
pub fn commit_table(&self, database_name: &str, table_name: &str) -> String {
format!(
"{}/{}/{}/{}/{}/commit",
self.base_path,
Self::DATABASES,
RESTUtil::encode_string(database_name),
Self::TABLES,
RESTUtil::encode_string(table_name)
)
}
}

#[cfg(test)]
Expand Down
30 changes: 29 additions & 1 deletion crates/paimon/src/api/rest_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::collections::HashMap;
use crate::api::rest_client::HttpClient;
use crate::catalog::Identifier;
use crate::common::{CatalogOptions, Options};
use crate::spec::Schema;
use crate::spec::{PartitionStatistics, Schema, Snapshot};
use crate::Result;

use super::api_request::{
Expand Down Expand Up @@ -391,4 +391,32 @@ impl RESTApi {
let path = self.resource_paths.table_token(database, table);
self.client.get(&path, None::<&[(&str, &str)]>).await
}

// ==================== Commit Operations ====================

/// Commit a snapshot for a table.
///
/// Corresponds to Python `RESTApi.commit_snapshot`.
pub async fn commit_snapshot(
&self,
identifier: &Identifier,
table_uuid: &str,
snapshot: &Snapshot,
statistics: &[PartitionStatistics],
) -> Result<bool> {
let database = identifier.database();
let table = identifier.object();
validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?;
let path = self.resource_paths.commit_table(database, table);
let request = serde_json::json!({
"tableUuid": table_uuid,
"snapshot": snapshot,
"statistics": statistics,
});
let resp: serde_json::Value = self.client.post(&path, &request).await?;
Ok(resp
.get("success")
.and_then(|v| v.as_bool())
.unwrap_or(false))
}
}
1 change: 1 addition & 0 deletions crates/paimon/src/catalog/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl Catalog for FileSystemCatalog {
identifier.clone(),
table_path,
schema,
None,
))
}

Expand Down
21 changes: 17 additions & 4 deletions crates/paimon/src/catalog/rest/rest_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! a Paimon REST catalog server for database and table CRUD operations.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;

Expand All @@ -32,7 +33,7 @@ use crate::common::{CatalogOptions, Options};
use crate::error::Error;
use crate::io::FileIO;
use crate::spec::{Schema, SchemaChange, TableSchema};
use crate::table::Table;
use crate::table::{RESTEnv, Table};
use crate::Result;

use super::rest_token_file_io::RESTTokenFileIO;
Expand All @@ -44,8 +45,8 @@ use super::rest_token_file_io::RESTTokenFileIO;
///
/// Corresponds to Python `RESTCatalog` in `pypaimon/catalog/rest/rest_catalog.py`.
pub struct RESTCatalog {
/// The REST API client.
api: RESTApi,
/// The REST API client (shared with RESTEnv).
api: Arc<RESTApi>,
/// Catalog configuration options.
options: Options,
/// Warehouse path.
Expand All @@ -71,7 +72,7 @@ impl RESTCatalog {
message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE),
})?;

let api = RESTApi::new(options.clone(), config_required).await?;
let api = Arc::new(RESTApi::new(options.clone(), config_required).await?);

let data_token_enabled = api
.options()
Expand Down Expand Up @@ -232,6 +233,15 @@ impl Catalog for RESTCatalog {
source: None,
})?;

// Extract table uuid for RESTEnv
let uuid = response.id.ok_or_else(|| Error::DataInvalid {
message: format!(
"Table {} response missing id (uuid)",
identifier.full_name()
),
source: None,
})?;

// Build FileIO based on data_token_enabled and is_external
// TODO Support token cache and direct oss access
let file_io = if self.data_token_enabled && !is_external {
Expand All @@ -244,11 +254,14 @@ impl Catalog for RESTCatalog {
FileIO::from_path(&table_path)?.build()?
};

let rest_env = RESTEnv::new(identifier.clone(), uuid, self.api.clone());

Ok(Table::new(
file_io,
identifier.clone(),
table_path,
table_schema,
Some(rest_env),
))
}

Expand Down
5 changes: 3 additions & 2 deletions crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use catalog::CatalogFactory;
pub use catalog::FileSystemCatalog;

pub use table::{
DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, RowRange,
SnapshotManager, Table, TableRead, TableScan, TagManager,
CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv,
RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, SnapshotCommit,
SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, WriteBuilder,
};
149 changes: 149 additions & 0 deletions crates/paimon/src/spec/binary_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! and BinaryRowBuilder for constructing BinaryRow instances.

use crate::spec::murmur_hash::hash_by_words;
use crate::spec::{DataType, Datum};
use serde::{Deserialize, Serialize};

pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
Expand Down Expand Up @@ -523,6 +524,87 @@ impl BinaryRowBuilder {
serialized.extend_from_slice(&self.data);
serialized
}

/// Write a Datum value at the given position, dispatching by type.
pub fn write_datum(&mut self, pos: usize, datum: &Datum, data_type: &DataType) {
match datum {
Datum::Bool(v) => self.write_boolean(pos, *v),
Datum::TinyInt(v) => self.write_byte(pos, *v),
Datum::SmallInt(v) => self.write_short(pos, *v),
Datum::Int(v) | Datum::Date(v) | Datum::Time(v) => self.write_int(pos, *v),
Datum::Long(v) => self.write_long(pos, *v),
Datum::Float(v) => self.write_float(pos, *v),
Datum::Double(v) => self.write_double(pos, *v),
Datum::Timestamp { millis, nanos } => {
let precision = match data_type {
DataType::Timestamp(ts) => ts.precision(),
_ => 3,
};
if precision <= 3 {
self.write_timestamp_compact(pos, *millis);
} else {
self.write_timestamp_non_compact(pos, *millis, *nanos);
}
}
Datum::LocalZonedTimestamp { millis, nanos } => {
let precision = match data_type {
DataType::LocalZonedTimestamp(ts) => ts.precision(),
_ => 3,
};
if precision <= 3 {
self.write_timestamp_compact(pos, *millis);
} else {
self.write_timestamp_non_compact(pos, *millis, *nanos);
}
}
Datum::Decimal {
unscaled,
precision,
..
} => {
if *precision <= 18 {
self.write_decimal_compact(pos, *unscaled as i64);
} else {
self.write_decimal_var_len(pos, *unscaled);
}
}
Datum::String(s) => {
if s.len() <= 7 {
self.write_string_inline(pos, s);
} else {
self.write_string(pos, s);
}
}
Datum::Bytes(b) => {
if b.len() <= 7 {
self.write_binary_inline(pos, b);
} else {
self.write_binary(pos, b);
}
}
}
}
}

/// Build a serialized BinaryRow from optional Datum values.
/// Returns empty vec if all values are None.
pub fn datums_to_binary_row(datums: &[(&Option<Datum>, &DataType)]) -> Vec<u8> {
if datums.iter().all(|(d, _)| d.is_none()) {
return vec![];
}
let arity = datums.len() as i32;
let mut builder = BinaryRowBuilder::new(arity);
for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
match datum_opt {
Some(datum) => {
builder.write_datum(pos, datum, data_type);
}
None => {
builder.set_null_at(pos);
}
}
}
builder.build_serialized()
}

#[cfg(test)]
Expand Down Expand Up @@ -756,6 +838,73 @@ mod tests {
assert_eq!(nano, 0);
}

#[test]
fn test_write_datum_int_and_string() {
let mut builder = BinaryRowBuilder::new(2);
builder.write_datum(
0,
&Datum::Int(42),
&DataType::Int(crate::spec::IntType::new()),
);
builder.write_datum(
1,
&Datum::String("hello".to_string()),
&DataType::VarChar(crate::spec::VarCharType::string_type()),
);
let row = builder.build();
assert_eq!(row.get_int(0).unwrap(), 42);
assert_eq!(row.get_string(1).unwrap(), "hello");
}

#[test]
fn test_write_datum_long_string() {
let mut builder = BinaryRowBuilder::new(1);
builder.write_datum(
0,
&Datum::String("long_string_value".to_string()),
&DataType::VarChar(crate::spec::VarCharType::string_type()),
);
let row = builder.build();
assert_eq!(row.get_string(0).unwrap(), "long_string_value");
}

#[test]
fn test_datums_to_binary_row_roundtrip() {
let d1 = Some(Datum::Int(100));
let d2 = Some(Datum::String("abc".to_string()));
let dt1 = DataType::Int(crate::spec::IntType::new());
let dt2 = DataType::VarChar(crate::spec::VarCharType::string_type());
let datums = vec![(&d1, &dt1), (&d2, &dt2)];
let bytes = datums_to_binary_row(&datums);
assert!(!bytes.is_empty());
let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
assert_eq!(row.get_int(0).unwrap(), 100);
assert_eq!(row.get_string(1).unwrap(), "abc");
}

#[test]
fn test_datums_to_binary_row_all_none() {
let d1: Option<Datum> = None;
let dt1 = DataType::Int(crate::spec::IntType::new());
let datums = vec![(&d1, &dt1)];
let bytes = datums_to_binary_row(&datums);
assert!(bytes.is_empty());
}

#[test]
fn test_datums_to_binary_row_mixed_null() {
let d1 = Some(Datum::Int(7));
let d2: Option<Datum> = None;
let dt1 = DataType::Int(crate::spec::IntType::new());
let dt2 = DataType::Int(crate::spec::IntType::new());
let datums = vec![(&d1, &dt1), (&d2, &dt2)];
let bytes = datums_to_binary_row(&datums);
assert!(!bytes.is_empty());
let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
assert_eq!(row.get_int(0).unwrap(), 7);
assert!(row.is_null_at(1));
}

#[test]
fn test_get_timestamp_non_compact() {
let epoch_millis: i64 = 1_704_067_200_123;
Expand Down
Loading
Loading