Skip to content

Commit db71611

Browse files
authored
feat(table): Add commit pipeline with SnapshotCommit abstraction (#233)
Implement the table write and commit infrastructure, including: - SnapshotCommit trait with RenamingSnapshotCommit (filesystem) and RESTSnapshotCommit (REST catalog) implementations - TableCommit with retry logic, append/overwrite/truncate support, partition statistics generation, and row tracking - WriteBuilder as the entry point for creating TableCommit instances, with overwrite mode configured at construction time - RESTEnv to hold REST catalog context (API client, identifier, uuid) - CommitMessage, PartitionStatistics, and ManifestList types - SnapshotManager extensions for atomic snapshot commit and latest hint - BinaryRow write_datum and datums_to_binary_row utilities - CoreOptions accessors for bucket, commit retry, and row tracking Reference: pypaimon commit pipeline
1 parent e0d1f69 commit db71611

27 files changed

Lines changed: 2158 additions & 55 deletions

crates/integrations/datafusion/src/physical_plan/scan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ mod tests {
248248
use paimon::spec::{
249249
BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema,
250250
};
251+
use paimon::table::Table;
251252
use std::fs;
252253
use tempfile::tempdir;
253254
use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
@@ -298,6 +299,7 @@ mod tests {
298299
Identifier::new("test_db", "test_table"),
299300
"/tmp/test-table".to_string(),
300301
table_schema,
302+
None,
301303
)
302304
}
303305

@@ -329,6 +331,7 @@ mod tests {
329331
Identifier::new("default", "t"),
330332
table_path,
331333
table_schema,
334+
None,
332335
);
333336

334337
let split = paimon::DataSplitBuilder::new()

crates/paimon/src/api/resource_paths.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,18 @@ impl ResourcePaths {
131131
pub fn rename_table(&self) -> String {
132132
format!("{}/{}/rename", self.base_path, Self::TABLES)
133133
}
134+
135+
/// Get the commit table endpoint path.
136+
pub fn commit_table(&self, database_name: &str, table_name: &str) -> String {
137+
format!(
138+
"{}/{}/{}/{}/{}/commit",
139+
self.base_path,
140+
Self::DATABASES,
141+
RESTUtil::encode_string(database_name),
142+
Self::TABLES,
143+
RESTUtil::encode_string(table_name)
144+
)
145+
}
134146
}
135147

136148
#[cfg(test)]

crates/paimon/src/api/rest_api.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::collections::HashMap;
2525
use crate::api::rest_client::HttpClient;
2626
use crate::catalog::Identifier;
2727
use crate::common::{CatalogOptions, Options};
28-
use crate::spec::Schema;
28+
use crate::spec::{PartitionStatistics, Schema, Snapshot};
2929
use crate::Result;
3030

3131
use super::api_request::{
@@ -391,4 +391,32 @@ impl RESTApi {
391391
let path = self.resource_paths.table_token(database, table);
392392
self.client.get(&path, None::<&[(&str, &str)]>).await
393393
}
394+
395+
// ==================== Commit Operations ====================
396+
397+
/// Commit a snapshot for a table.
398+
///
399+
/// Corresponds to Python `RESTApi.commit_snapshot`.
400+
pub async fn commit_snapshot(
401+
&self,
402+
identifier: &Identifier,
403+
table_uuid: &str,
404+
snapshot: &Snapshot,
405+
statistics: &[PartitionStatistics],
406+
) -> Result<bool> {
407+
let database = identifier.database();
408+
let table = identifier.object();
409+
validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?;
410+
let path = self.resource_paths.commit_table(database, table);
411+
let request = serde_json::json!({
412+
"tableUuid": table_uuid,
413+
"snapshot": snapshot,
414+
"statistics": statistics,
415+
});
416+
let resp: serde_json::Value = self.client.post(&path, &request).await?;
417+
Ok(resp
418+
.get("success")
419+
.and_then(|v| v.as_bool())
420+
.unwrap_or(false))
421+
}
394422
}

crates/paimon/src/catalog/filesystem.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ impl Catalog for FileSystemCatalog {
327327
identifier.clone(),
328328
table_path,
329329
schema,
330+
None,
330331
))
331332
}
332333

crates/paimon/src/catalog/rest/rest_catalog.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
//! a Paimon REST catalog server for database and table CRUD operations.
2222
2323
use std::collections::HashMap;
24+
use std::sync::Arc;
2425

2526
use async_trait::async_trait;
2627

@@ -32,7 +33,7 @@ use crate::common::{CatalogOptions, Options};
3233
use crate::error::Error;
3334
use crate::io::FileIO;
3435
use crate::spec::{Schema, SchemaChange, TableSchema};
35-
use crate::table::Table;
36+
use crate::table::{RESTEnv, Table};
3637
use crate::Result;
3738

3839
use super::rest_token_file_io::RESTTokenFileIO;
@@ -44,8 +45,8 @@ use super::rest_token_file_io::RESTTokenFileIO;
4445
///
4546
/// Corresponds to Python `RESTCatalog` in `pypaimon/catalog/rest/rest_catalog.py`.
4647
pub struct RESTCatalog {
47-
/// The REST API client.
48-
api: RESTApi,
48+
/// The REST API client (shared with RESTEnv).
49+
api: Arc<RESTApi>,
4950
/// Catalog configuration options.
5051
options: Options,
5152
/// Warehouse path.
@@ -71,7 +72,7 @@ impl RESTCatalog {
7172
message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE),
7273
})?;
7374

74-
let api = RESTApi::new(options.clone(), config_required).await?;
75+
let api = Arc::new(RESTApi::new(options.clone(), config_required).await?);
7576

7677
let data_token_enabled = api
7778
.options()
@@ -232,6 +233,15 @@ impl Catalog for RESTCatalog {
232233
source: None,
233234
})?;
234235

236+
// Extract table uuid for RESTEnv
237+
let uuid = response.id.ok_or_else(|| Error::DataInvalid {
238+
message: format!(
239+
"Table {} response missing id (uuid)",
240+
identifier.full_name()
241+
),
242+
source: None,
243+
})?;
244+
235245
// Build FileIO based on data_token_enabled and is_external
236246
// TODO Support token cache and direct oss access
237247
let file_io = if self.data_token_enabled && !is_external {
@@ -244,11 +254,14 @@ impl Catalog for RESTCatalog {
244254
FileIO::from_path(&table_path)?.build()?
245255
};
246256

257+
let rest_env = RESTEnv::new(identifier.clone(), uuid, self.api.clone());
258+
247259
Ok(Table::new(
248260
file_io,
249261
identifier.clone(),
250262
table_path,
251263
table_schema,
264+
Some(rest_env),
252265
))
253266
}
254267

crates/paimon/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub use catalog::CatalogFactory;
4242
pub use catalog::FileSystemCatalog;
4343

4444
pub use table::{
45-
DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, RowRange,
46-
SnapshotManager, Table, TableRead, TableScan, TagManager,
45+
CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv,
46+
RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, SnapshotCommit,
47+
SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, WriteBuilder,
4748
};

crates/paimon/src/spec/binary_row.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! and BinaryRowBuilder for constructing BinaryRow instances.
2020
2121
use crate::spec::murmur_hash::hash_by_words;
22+
use crate::spec::{DataType, Datum};
2223
use serde::{Deserialize, Serialize};
2324

2425
pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);
@@ -523,6 +524,87 @@ impl BinaryRowBuilder {
523524
serialized.extend_from_slice(&self.data);
524525
serialized
525526
}
527+
528+
/// Write a Datum value at the given position, dispatching by type.
529+
pub fn write_datum(&mut self, pos: usize, datum: &Datum, data_type: &DataType) {
530+
match datum {
531+
Datum::Bool(v) => self.write_boolean(pos, *v),
532+
Datum::TinyInt(v) => self.write_byte(pos, *v),
533+
Datum::SmallInt(v) => self.write_short(pos, *v),
534+
Datum::Int(v) | Datum::Date(v) | Datum::Time(v) => self.write_int(pos, *v),
535+
Datum::Long(v) => self.write_long(pos, *v),
536+
Datum::Float(v) => self.write_float(pos, *v),
537+
Datum::Double(v) => self.write_double(pos, *v),
538+
Datum::Timestamp { millis, nanos } => {
539+
let precision = match data_type {
540+
DataType::Timestamp(ts) => ts.precision(),
541+
_ => 3,
542+
};
543+
if precision <= 3 {
544+
self.write_timestamp_compact(pos, *millis);
545+
} else {
546+
self.write_timestamp_non_compact(pos, *millis, *nanos);
547+
}
548+
}
549+
Datum::LocalZonedTimestamp { millis, nanos } => {
550+
let precision = match data_type {
551+
DataType::LocalZonedTimestamp(ts) => ts.precision(),
552+
_ => 3,
553+
};
554+
if precision <= 3 {
555+
self.write_timestamp_compact(pos, *millis);
556+
} else {
557+
self.write_timestamp_non_compact(pos, *millis, *nanos);
558+
}
559+
}
560+
Datum::Decimal {
561+
unscaled,
562+
precision,
563+
..
564+
} => {
565+
if *precision <= 18 {
566+
self.write_decimal_compact(pos, *unscaled as i64);
567+
} else {
568+
self.write_decimal_var_len(pos, *unscaled);
569+
}
570+
}
571+
Datum::String(s) => {
572+
if s.len() <= 7 {
573+
self.write_string_inline(pos, s);
574+
} else {
575+
self.write_string(pos, s);
576+
}
577+
}
578+
Datum::Bytes(b) => {
579+
if b.len() <= 7 {
580+
self.write_binary_inline(pos, b);
581+
} else {
582+
self.write_binary(pos, b);
583+
}
584+
}
585+
}
586+
}
587+
}
588+
589+
/// Build a serialized BinaryRow from optional Datum values.
590+
/// Returns empty vec if all values are None.
591+
pub fn datums_to_binary_row(datums: &[(&Option<Datum>, &DataType)]) -> Vec<u8> {
592+
if datums.iter().all(|(d, _)| d.is_none()) {
593+
return vec![];
594+
}
595+
let arity = datums.len() as i32;
596+
let mut builder = BinaryRowBuilder::new(arity);
597+
for (pos, (datum_opt, data_type)) in datums.iter().enumerate() {
598+
match datum_opt {
599+
Some(datum) => {
600+
builder.write_datum(pos, datum, data_type);
601+
}
602+
None => {
603+
builder.set_null_at(pos);
604+
}
605+
}
606+
}
607+
builder.build_serialized()
526608
}
527609

528610
#[cfg(test)]
@@ -756,6 +838,73 @@ mod tests {
756838
assert_eq!(nano, 0);
757839
}
758840

841+
#[test]
842+
fn test_write_datum_int_and_string() {
843+
let mut builder = BinaryRowBuilder::new(2);
844+
builder.write_datum(
845+
0,
846+
&Datum::Int(42),
847+
&DataType::Int(crate::spec::IntType::new()),
848+
);
849+
builder.write_datum(
850+
1,
851+
&Datum::String("hello".to_string()),
852+
&DataType::VarChar(crate::spec::VarCharType::string_type()),
853+
);
854+
let row = builder.build();
855+
assert_eq!(row.get_int(0).unwrap(), 42);
856+
assert_eq!(row.get_string(1).unwrap(), "hello");
857+
}
858+
859+
#[test]
860+
fn test_write_datum_long_string() {
861+
let mut builder = BinaryRowBuilder::new(1);
862+
builder.write_datum(
863+
0,
864+
&Datum::String("long_string_value".to_string()),
865+
&DataType::VarChar(crate::spec::VarCharType::string_type()),
866+
);
867+
let row = builder.build();
868+
assert_eq!(row.get_string(0).unwrap(), "long_string_value");
869+
}
870+
871+
#[test]
872+
fn test_datums_to_binary_row_roundtrip() {
873+
let d1 = Some(Datum::Int(100));
874+
let d2 = Some(Datum::String("abc".to_string()));
875+
let dt1 = DataType::Int(crate::spec::IntType::new());
876+
let dt2 = DataType::VarChar(crate::spec::VarCharType::string_type());
877+
let datums = vec![(&d1, &dt1), (&d2, &dt2)];
878+
let bytes = datums_to_binary_row(&datums);
879+
assert!(!bytes.is_empty());
880+
let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
881+
assert_eq!(row.get_int(0).unwrap(), 100);
882+
assert_eq!(row.get_string(1).unwrap(), "abc");
883+
}
884+
885+
#[test]
886+
fn test_datums_to_binary_row_all_none() {
887+
let d1: Option<Datum> = None;
888+
let dt1 = DataType::Int(crate::spec::IntType::new());
889+
let datums = vec![(&d1, &dt1)];
890+
let bytes = datums_to_binary_row(&datums);
891+
assert!(bytes.is_empty());
892+
}
893+
894+
#[test]
895+
fn test_datums_to_binary_row_mixed_null() {
896+
let d1 = Some(Datum::Int(7));
897+
let d2: Option<Datum> = None;
898+
let dt1 = DataType::Int(crate::spec::IntType::new());
899+
let dt2 = DataType::Int(crate::spec::IntType::new());
900+
let datums = vec![(&d1, &dt1), (&d2, &dt2)];
901+
let bytes = datums_to_binary_row(&datums);
902+
assert!(!bytes.is_empty());
903+
let row = BinaryRow::from_serialized_bytes(&bytes).unwrap();
904+
assert_eq!(row.get_int(0).unwrap(), 7);
905+
assert!(row.is_null_at(1));
906+
}
907+
759908
#[test]
760909
fn test_get_timestamp_non_compact() {
761910
let epoch_millis: i64 = 1_704_067_200_123;

0 commit comments

Comments
 (0)