[python][daft] Make Daft Paimon read source serializable#8029
[python][daft] Make Daft Paimon read source serializable#8029kerwin-zk wants to merge 3 commits into
Conversation
| file_io = getattr(table, "file_io", None) | ||
| properties = getattr(file_io, "properties", None) | ||
| if properties is None: | ||
| properties = getattr(file_io, "catalog_options", None) |
There was a problem hiding this comment.
We can all obtain properties information through the properties attribute, even for RESTTokenFileIO. However, we need to determine whether it is CachingFileIO (in which case it needs to be obtained from _delegate) to increase code robustness.
I further recommend unifying the abstract properties API for FileIO on the pypaimon side. Because we currently have too many getattr calls.
There was a problem hiding this comment.
Done. Added a properties property on CachingFileIO that delegates to its _delegate, so every FileIO implementation now exposes .properties uniformly. _extract_catalog_options reads table.file_io.properties directly, with no per-implementation getattr.
| if identifier is None: | ||
| return None | ||
|
|
||
| get_database_name = getattr(identifier, "get_database_name", None) |
There was a problem hiding this comment.
Maybe it's fine to call identifier.get_database_name and identifier.get_table_name directly, not though getattr.
There was a problem hiding this comment.
Done, calling identifier.get_database_name() / get_table_name() / get_branch_name() directly now. This also fixes a latent issue: the old getattr fallback used identifier.object, which is the encoded object name and would round-trip incorrectly for branch tables.
|
|
||
|
|
||
| def _extract_table_options(table: FileStoreTable) -> dict[str, Any]: | ||
| table_schema = getattr(table, "table_schema", None) |
There was a problem hiding this comment.
Maybe let's define schema method in FileStoreTable.
There was a problem hiding this comment.
Done. Added FileStoreTable.schema() returning the TableSchema, and _extract_table_options now uses table.schema().options.
| ) | ||
|
|
||
| if can_use_native_reader: | ||
| use_paimon_reader_task = ( |
There was a problem hiding this comment.
Please provide detailed notes here regarding the scenarios in which the Daft Native Reader can be used, and those in which the Paimon Reader is necessary.
There was a problem hiding this comment.
Added detailed comments above the reader-selection logic in get_tasks, describing when the Daft native Parquet reader is used and when the pypaimon reader task is required (non-Parquet, blob columns, LSM merge, or deletion vectors).
| return not self._is_parquet or self._has_blob_columns or self._table.is_primary_key_table | ||
|
|
||
| def _requires_serializable_paimon_reader_task(self) -> bool: | ||
| if self._warehouse_scheme in ("", "file"): |
There was a problem hiding this comment.
In ray environment, to scan a normal append-only paimon table location on aliyun oss, we expect this can run in daft native reader way. But when _warehouse_scheme of this case is oss, this method will return true, and use_paimon_reader_task will return true. This does not meet our expectations. Correct me If i'm wrong please.
There was a problem hiding this comment.
You're right, thanks. I removed the gate: a normal append-only Parquet table on OSS now goes through Daft's native reader under the Ray runner. This is safe because both the source and the pypaimon fallback task serialize only rebuildable metadata (catalog options, identifier, table path), and the native task carries Daft's own picklable StorageConfig. The pypaimon reader task is now used only for splits that genuinely need it (PK/LSM merge, non-Parquet, blob columns, deletion vectors). I verified end-to-end that community Daft reading an append-only Parquet table on OSS under the Ray runner uses the native reader and returns correct results.
|
@kerwin-zk Thank you for working on this. I left some comments. |
Purpose
Make the Daft Paimon read source serializable when running with Ray.
Previously,
PaimonDataSourceand fallback read tasks could retain liveFileStoreTable,FileIO,StorageConfig, orTableReadobjects. Withremote filesystems such as OSS/Jindo, Ray failed to serialize the execution
plan because those objects may contain non-picklable PyArrow filesystem state.
Tests
CI