[WIP][ray] Ray merge into#8028
Draft
XiaoHongbo-Hope wants to merge 32 commits into
Draft
Conversation
Pythonic MERGE INTO on Ray Datasets, mirroring Spark/Flink merge-into.
UPSERT-flavored clauses (matched-update, not-matched-insert,
not-matched-by-source-update) supported; DELETE raises NotImplementedError
pending KeyValueDataWriter row-kind work.
API:
from pypaimon.ray import merge_paimon
merge_paimon(target, source, catalog_options,
on=[...],
when_matched_update={...},
when_not_matched_insert="*")
Algorithm: read target -> tag _side -> union -> groupby(on).map_groups
to classify matched/not-matched and apply SET; write back via write_paimon
(PK upsert through _SEQUENCE_NUMBER).
Known bugs to fix in follow-up:
- _schema_type_map referenced but never defined (NameError on call)
- for f in batch.schema iterates pa.Schema (TypeError on pyarrow >= 18)
- type-mismatch fallback to pa.null() destroys join keys
- test helper _make_pk_table_with_flag returns 1 value, test unpacks 2
- _schema_type_map called but undefined: NameError on any cross-schema merge. - for f in batch.schema raises TypeError on pyarrow >= 18. - type-mismatch fallback to pa.null() drops join key values. - _make_pk_table_with_flag returned 1 value but caller unpacks 2.
…rop API - pa.Table.drop deprecated in newer pyarrow; switch to drop_columns. - matched branch silently produced cartesian product on multiple source rows. - _required_target_cols_for_passthrough widened projection to all columns when its spec was None, defeating the projection optimization.
Replace the driver-side matched-id set collection in the not-matched INSERT path with a distributed left_anti join on the per-row id, matching Spark's single LeftAnti predicate. Partition count is sized to the matched row count to keep hash partitions dense, since ray's join fails on empty partitions. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Reject multi-source cardinality by default; add allow_multiple_matches opt-in for deterministic keep-first. - Refuse blob-column writes loudly instead of emitting wrong-format files. - Check Dataset.join (ray>=2.50) at call time and restore the ray extra floor to 2.10, so read/sink users on older ray are unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Contributor
|
API Design Recommendations for Paimon Ray merge_into from AI: Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and Paimon Spark's MergeIntoPaimonDataEvolutionTable: |
The update path grouped by _FIRST_ROW_ID with ray's default 200 hash partitions, spawning hundreds of empty reduce tasks on small and medium merges. Cap the groupby partitions at the distinct group count (one per target data file), bounded by 200 so large tables keep today's behavior. Verified on a 2-node ray cluster: an 18000-row merge drops the shuffle from num_partitions=200 to 4 with no correctness change.
_assign_frid mapped each matched _ROW_ID to its file's first-row-id with a per-row Python bisect over to_pylist(), a CPU hot spot when many rows match. Replace it with a single numpy searchsorted over the matched batch, keeping the null and out-of-range guards. Verified: 26 ray merge_into unit tests pass; an 18000-row merge on a 2-node ray cluster stays correct.
When both when_matched and when_not_matched run on a non-empty target, build the UPDATE and INSERT datasets from one materialized LEFT_OUTER join instead of reading and shuffling the target table twice. The join shuffle dominates cost at scale, so routing matched (non-null target) and not-matched (null target) rows from a single join halves it.
Add two cases for the single-outer-join path: combined update+insert under a merge_condition, and a matched clause-level condition with no merge_condition (the full-target-read branch).
…pty-target insert - '*' SET spec resolves a renamed ON key via the source column, preventing NULL writes into the key column. - Inserting into an empty target skips all joins, avoiding ray's empty hash-partition crash. - Replace the zip(range) src-index with a deterministic per-block running offset: no realignment shuffle, no extra full copy, and no shuffle-materialize barrier (which deadlocked ray at <=2 CPUs). Handles both pandas- and arrow-backed source blocks.
… not a fixed 200 cap
… skip blob by default
…on, return metrics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Tests