Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e0be0ac
[python] [WIP] Initialize ray.merge_paimon connector
XiaoHongbo-Hope May 25, 2026
ef359af
[python] Fix ray merge_paimon NameError and schema alignment bugs
XiaoHongbo-Hope May 28, 2026
308311c
[python] Fix ray merge_paimon multi-source semantics and deprecated d…
XiaoHongbo-Hope May 28, 2026
2a7a4fb
[python] refactor ray merge_into module
XiaoHongbo-Hope May 28, 2026
714bb29
[python] wire ray merge_into not-matched INSERT path
XiaoHongbo-Hope May 28, 2026
99b88dd
[python] wire ray merge_into matched UPDATE path
XiaoHongbo-Hope May 28, 2026
ebe771b
[python] simplify ray merge_into clause surface
XiaoHongbo-Hope May 28, 2026
57b0a45
[python] redesign ray merge_into clause API
XiaoHongbo-Hope May 28, 2026
ccd9c9d
[python] revert unused shuffle helper extraction
XiaoHongbo-Hope May 28, 2026
81438c5
[python] refactor ray merge_into execution path
XiaoHongbo-Hope May 28, 2026
9384be0
[python] parallelize ray merge_into insert path
XiaoHongbo-Hope May 29, 2026
41024dc
[python] parallelize ray merge_into self-merge path
XiaoHongbo-Hope May 29, 2026
3143865
[python] distribute ray merge_into update path end-to-end
XiaoHongbo-Hope May 29, 2026
333eb31
[python] address ray merge_into review findings
XiaoHongbo-Hope May 29, 2026
a1b8a93
[python] tighten ray merge_into edge cases and src-row identity
XiaoHongbo-Hope May 29, 2026
c4af065
[python] optimize ray merge_into hot path with vectorized fallback
XiaoHongbo-Hope May 29, 2026
5fc8a7e
[python] align ray merge_into cardinality and self-merge with spark
XiaoHongbo-Hope May 29, 2026
0c04650
[python] distribute ray merge_into not-matched insert via anti-join
XiaoHongbo-Hope May 29, 2026
51d1bb5
[python] guard ray merge_into correctness and dependency edges
XiaoHongbo-Hope May 29, 2026
bacd07e
[python] narrow ray merge_into blob guard to update path
XiaoHongbo-Hope May 29, 2026
2b76ad2
[python] size ray merge_into update shuffle to actual group count
XiaoHongbo-Hope May 29, 2026
ad70c4d
[python] vectorize _assign_frid row-id bucketing
XiaoHongbo-Hope May 29, 2026
e9fb3a6
[python] fuse ray merge_into matched/not-matched into one outer join
XiaoHongbo-Hope May 29, 2026
74a56a5
[python] cover ray merge_into unified path with both clauses
XiaoHongbo-Hope May 29, 2026
e2ce4ad
[python] mint ray merge_into src idx per-block, fix '*' rename and em…
XiaoHongbo-Hope May 29, 2026
1cb429a
[python] align ray merge_into global-index handling with Spark
XiaoHongbo-Hope May 30, 2026
d67579b
[python] drop redundant merge_into strict-mode guard, rely on row-id …
XiaoHongbo-Hope May 30, 2026
fc44a9d
[python] drive merge_into update-write parallelism by num_partitions,…
XiaoHongbo-Hope May 30, 2026
d838567
[python] project only needed target columns into merge_into update join
XiaoHongbo-Hope May 30, 2026
bdf0049
[python] add merge_into condition_cols for precise target projection,…
XiaoHongbo-Hope May 30, 2026
92b4590
[python] merge_into: string-expression conditions, drop merge_conditi…
XiaoHongbo-Hope May 30, 2026
68152a1
[python] doc: document Ray merge_into in ray-data.md
XiaoHongbo-Hope May 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/docs/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,48 @@ write_builder = table.new_batch_write_builder().overwrite()
# overwrite partition 'dt=2024-01-01'
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
```

## Merge Into

`merge_into` updates (and optionally inserts) rows of a **data-evolution** table
from a source, like SQL `MERGE INTO`. Matched rows are updated in place by
`_ROW_ID`; only the touched columns are rewritten. Requires `ray >= 2.50` and a
target table with `'data-evolution.enabled'` and `'row-tracking.enabled'` set.

```python
from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched

metrics = merge_into(
target="database_name.table_name",
source=ray_dataset, # ray.data.Dataset / pa.Table / pandas / table-name str
catalog_options={"warehouse": "/path/to/warehouse"},
on=["id"], # or {"target_col": "source_col"} for renamed keys
when_matched=[WhenMatched(update={"score": "s.score"})], # or update="*"
when_not_matched=[WhenNotMatched(insert="*")], # optional
)
print(metrics) # {"num_updated": 3, "num_inserted": 2}
```

- `update` / `insert`: `"*"` (all columns from source), or a dict mapping target
columns to `"s.<col>"`, `"t.<col>"`, or a literal.
- `condition` (optional): a string expression over `s.<col>` / `t.<col>` using
`> < >= <= == != and or not`; only referenced columns are read. Example:
`WhenMatched(update={"score": "s.score"}, condition="s.version > t.version")`.

**Parameters:**
- `on`: key columns, or `{target_col: source_col}` for renamed keys.
- `num_partitions`: shuffle parallelism for the join and the write; defaults to
`max(16, cluster_cpus * 2)`, raise it for large merges.
- `ray_remote_args`, `concurrency`: scheduling for the insert path.
- `allow_multiple_matches`: if `False` (default), a target row matched by
multiple source rows raises; `True` keeps the first match.

**Returns:** `{"num_updated", "num_inserted"}`.

**Notes:**
- Blob columns cannot be updated and are never read into the join.
- Updating a globally-indexed column raises by default; set
`'global-index.column-update-action' = 'DROP_PARTITION_INDEX'` to drop the
affected index instead (rebuild afterwards).
- Cost scales with how many data files the updated rows touch; scattered updates
over a large table rewrite the updated column of many files.
5 changes: 3 additions & 2 deletions paimon-python/dev/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
duckdb==1.3.2
flake8==4.0.1
pytest~=7.0
# Ray: 2.48+ has no wheel for Python 3.8; use 2.10.0 on 3.8, 2.48.0 on 3.9+
ray>=2.10.0
# merge_into needs Dataset.join (added in Ray 2.50). Python 3.8 has no 2.50 wheel.
ray>=2.10.0; python_version < "3.9"
ray>=2.50.0; python_version >= "3.9"
requests
parameterized
# Vortex 0.71.0 regresses native predicate pushdown on single-row files.
Expand Down
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,16 @@ class CoreOptions:
)
)

GLOBAL_INDEX_COLUMN_UPDATE_ACTION: ConfigOption[str] = (
ConfigOptions.key("global-index.column-update-action")
.string_type()
.default_value("THROW_ERROR")
.with_description(
"Defines the action to take when an update modifies columns that "
"are covered by a global index. THROW_ERROR or DROP_PARTITION_INDEX."
)
)

LOCAL_CACHE_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("local-cache.enabled")
.boolean_type()
Expand Down Expand Up @@ -652,6 +662,9 @@ def row_tracking_enabled(self, default=None):
def data_evolution_enabled(self, default=None):
return self.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, default)

def global_index_column_update_action(self, default=None):
return self.options.get(CoreOptions.GLOBAL_INDEX_COLUMN_UPDATE_ACTION, default)

def deletion_vectors_enabled(self, default=None):
return self.options.get(CoreOptions.DELETION_VECTORS_ENABLED, default)

Expand Down
19 changes: 0 additions & 19 deletions paimon-python/pypaimon/manifest/index_manifest_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,3 @@ def __eq__(self, other):
def __hash__(self):
return hash((self.kind, tuple(self.partition.values),
self.bucket, self.index_file))


INDEX_MANIFEST_ENTRY = {
"type": "record",
"name": "IndexManifestEntry",
"fields": [
{"name": "_VERSION", "type": "int"},
{"name": "_KIND", "type": "byte"},
{"name": "_PARTITION", "type": "bytes"},
{"name": "_BUCKET", "type": "int"},
{"name": "_INDEX_TYPE", "type": "string"},
{"name": "_FILE_NAME", "type": "string"},
{"name": "_FILE_SIZE", "type": "long"},
{"name": "_ROW_COUNT", "type": "long"},
{"name": "_DELETIONS_VECTORS_RANGES", "type": {"type": "array", "elementType": "DeletionVectorMeta"}},
{"name": "_EXTERNAL_PATH", "type": ["null", "string"]},
{"name": "_GLOBAL_INDEX", "type": "GlobalIndexMeta"}
]
}
122 changes: 120 additions & 2 deletions paimon-python/pypaimon/manifest/index_manifest_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import uuid
from io import BytesIO
from typing import List, Optional

Expand All @@ -24,11 +25,60 @@
from pypaimon.index.deletion_vector_meta import DeletionVectorMeta
from pypaimon.index.index_file_meta import IndexFileMeta
from pypaimon.manifest.index_manifest_entry import IndexManifestEntry
from pypaimon.table.row.generic_row import GenericRowDeserializer
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
GenericRowSerializer)
from pypaimon.utils.file_store_path_factory import FileStorePathFactory

_DELETION_VECTOR_META_SCHEMA = {
"type": "record",
"name": "DeletionVectorMeta",
"fields": [
{"name": "f0", "type": "string"},
{"name": "f1", "type": "long"},
{"name": "f2", "type": "int"},
{"name": "_CARDINALITY", "type": ["null", "long"], "default": None},
],
}

_GLOBAL_INDEX_META_SCHEMA = {
"type": "record",
"name": "GlobalIndexMeta",
"fields": [
{"name": "_ROW_RANGE_START", "type": "long"},
{"name": "_ROW_RANGE_END", "type": "long"},
{"name": "_INDEX_FIELD_ID", "type": "int"},
{"name": "_EXTRA_FIELD_IDS",
"type": ["null", {"type": "array", "items": "int"}], "default": None},
{"name": "_INDEX_META", "type": ["null", "bytes"], "default": None},
],
}

INDEX_MANIFEST_ENTRY_SCHEMA = {
"type": "record",
"name": "IndexManifestEntry",
"fields": [
{"name": "_VERSION", "type": "int"},
{"name": "_KIND", "type": "int"},
{"name": "_PARTITION", "type": "bytes"},
{"name": "_BUCKET", "type": "int"},
{"name": "_INDEX_TYPE", "type": "string"},
{"name": "_FILE_NAME", "type": "string"},
{"name": "_FILE_SIZE", "type": "long"},
{"name": "_ROW_COUNT", "type": "long"},
{"name": "_DELETIONS_VECTORS_RANGES",
"type": ["null", {"type": "array", "items": _DELETION_VECTOR_META_SCHEMA}],
"default": None},
{"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": None},
{"name": "_GLOBAL_INDEX",
"type": ["null", _GLOBAL_INDEX_META_SCHEMA], "default": None},
],
}

_INDEX_ENTRY_VERSION = 1


class IndexManifestFile:
"""Index manifest file reader for reading index manifest entries."""
"""Index manifest file reader/writer for index manifest entries."""

DELETION_VECTORS_INDEX = "DELETION_VECTORS"

Expand Down Expand Up @@ -172,5 +222,73 @@ def _parse_global_index_meta(self, global_index_record) -> Optional[GlobalIndexM
row_range_start=global_index_record.get('_ROW_RANGE_START', 0),
row_range_end=global_index_record.get('_ROW_RANGE_END', 0),
index_field_id=global_index_record.get('_INDEX_FIELD_ID', 0),
extra_field_ids=global_index_record.get('_EXTRA_FIELD_IDS'),
index_meta=global_index_record.get('_INDEX_META')
)

def combine(
self,
previous_name: Optional[str],
deletes: List[IndexManifestEntry],
) -> Optional[str]:
"""Apply DELETE entries to the previous index manifest and write a new one.

Mirrors Java GlobalIndexCombiner: the stored manifest only holds ADD
entries; deleting means dropping the entries whose index file name
appears in *deletes*. Returns the new manifest file name, or
*previous_name* unchanged when there is nothing to delete.
"""
if not deletes:
return previous_name
previous = self.read(previous_name) if previous_name else []
delete_names = {e.index_file.file_name for e in deletes}
survivors = [e for e in previous if e.index_file.file_name not in delete_names]
return self.write(survivors)

def write(self, entries: List[IndexManifestEntry]) -> str:
"""Serialize *entries* to a new Avro index manifest, return its name."""
file_name = f"{FileStorePathFactory.INDEX_MANIFEST_PREFIX}{uuid.uuid4()}"
path = f"{self.manifest_path}/{file_name}"
records = [self._to_avro_record(e) for e in entries]
try:
buffer = BytesIO()
fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records)
with self.file_io.new_output_stream(path) as output_stream:
output_stream.write(buffer.getvalue())
except Exception as e:
self.file_io.delete_quietly(path)
raise RuntimeError(f"Failed to write index manifest file: {e}") from e
return file_name

def _to_avro_record(self, entry: IndexManifestEntry) -> dict:
index_file = entry.index_file
dv_ranges = None
if index_file.dv_ranges:
dv_ranges = [
{"f0": dv.data_file_name, "f1": dv.offset, "f2": dv.length,
"_CARDINALITY": dv.cardinality}
for dv in index_file.dv_ranges.values()
]
global_index = None
if index_file.global_index_meta is not None:
gim = index_file.global_index_meta
global_index = {
"_ROW_RANGE_START": gim.row_range_start,
"_ROW_RANGE_END": gim.row_range_end,
"_INDEX_FIELD_ID": gim.index_field_id,
"_EXTRA_FIELD_IDS": gim.extra_field_ids,
"_INDEX_META": gim.index_meta,
}
return {
"_VERSION": _INDEX_ENTRY_VERSION,
"_KIND": entry.kind,
"_PARTITION": GenericRowSerializer.to_bytes(entry.partition),
"_BUCKET": entry.bucket,
"_INDEX_TYPE": index_file.index_type,
"_FILE_NAME": index_file.file_name,
"_FILE_SIZE": index_file.file_size,
"_ROW_COUNT": index_file.row_count,
"_DELETIONS_VECTORS_RANGES": dv_ranges,
"_EXTERNAL_PATH": index_file.external_path,
"_GLOBAL_INDEX": global_index,
}
13 changes: 12 additions & 1 deletion paimon-python/pypaimon/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,16 @@
# under the License.

from pypaimon.ray.ray_paimon import read_paimon, write_paimon
from pypaimon.ray.data_evolution_merge_into import (
WhenMatched,
WhenNotMatched,
merge_into,
)

__all__ = ["read_paimon", "write_paimon"]
__all__ = [
"read_paimon",
"write_paimon",
"merge_into",
"WhenMatched",
"WhenNotMatched",
]
Loading
Loading