Skip to content

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234

Open
JingsongLi wants to merge 2 commits intoapache:mainfrom
JingsongLi:writer
Open

feat(write): add write pipeline with DataFusion INSERT INTO/OVERWRITE support#234
JingsongLi wants to merge 2 commits intoapache:mainfrom
JingsongLi:writer

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Purpose

Subtask of #232

Add TableWrite for writing Arrow RecordBatches to Paimon append-only tables. Each (partition, bucket) pair gets its own DataFileWriter with direct writes (matching delta-rs DeltaWriter pattern). File rolling uses tokio::spawn for background close, and prepare_commit uses try_join_all for parallel finalization across partition writers.

Key components:

  • TableWrite: routes batches by partition/bucket, holds DataFileWriters
  • DataFileWriter: manages parquet file lifecycle with rolling support
  • WriteBuilder: creates TableWrite and TableCommit instances
  • PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
  • FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:

  • file.compression (default: zstd)
  • target-file-size (default: 256MB)
  • write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned, fixed-bucket, multi-commit, column projection, and bucket filtering.

Brief change log

Tests

API and Format

Documentation

let row = BinaryRow::from_serialized_bytes(&msg.partition)?;
let mut spec = HashMap::new();
for (i, key) in partition_keys.iter().enumerate() {
if let Some(datum) = extract_datum(&row, i, &data_types[i])? {
Copy link
Copy Markdown
Contributor

@littlecoder04 littlecoder04 Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop NULL partition keys from the overwrite predicate. I reproduced a case where overwriting the NULL partition also deletes other partitions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

… support

Add TableWrite for writing Arrow RecordBatches to Paimon append-only
tables. Each (partition, bucket) pair gets its own DataFileWriter with
direct writes (matching delta-rs DeltaWriter pattern). File rolling
uses tokio::spawn for background close, and prepare_commit uses
try_join_all for parallel finalization across partition writers.

Key components:
- TableWrite: routes batches by partition/bucket, holds DataFileWriters
- DataFileWriter: manages parquet file lifecycle with rolling support
- WriteBuilder: creates TableWrite and TableCommit instances
- PaimonDataSink: DataFusion DataSink integration for INSERT/OVERWRITE
- FormatFileWriter: extended with flush() and in_progress_size()

Configurable options via CoreOptions:
- file.compression (default: zstd)
- target-file-size (default: 256MB)
- write.parquet-buffer-size (default: 256MB)

Includes E2E integration tests for unpartitioned, partitioned,
fixed-bucket, multi-commit, column projection, and bucket filtering.
let datum = extract_datum_from_arrow(batch, row_idx, field_idx, field.data_type())?;
if let Some(d) = datum {
datums.push((d, field.data_type().clone()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drop NULL bucket-key fields before hashing. Java preserves NULL positions here; see FixedBucketRowKeyExtractorTest.testUnCompactDecimalAndTimestampNullValueBucketNumber.
https://github.com/apache/paimon/blob/master/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants