diff --git a/paimon-python/pypaimon/daft/daft_datasource.py b/paimon-python/pypaimon/daft/daft_datasource.py index 457fae375c57..f63fd8f1c957 100644 --- a/paimon-python/pypaimon/daft/daft_datasource.py +++ b/paimon-python/pypaimon/daft/daft_datasource.py @@ -39,7 +39,6 @@ from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.data_file_meta import DataFileMeta - from pypaimon.read.table_read import TableRead from pypaimon.read.split import Split from pypaimon.table.file_store_table import FileStoreTable @@ -52,6 +51,8 @@ PAIMON_FILE_FORMAT_ORC = "orc" PAIMON_FILE_FORMAT_AVRO = "avro" +_PaimonIdentifier = tuple[str, str, str | None] + @dataclass(frozen=True, slots=True) class _ReadPushdownState: @@ -63,6 +64,84 @@ class _ReadPushdownState: source_limit: int | None +def _options_to_dict(options: Any) -> dict[str, Any]: + if options is None: + return {} + if isinstance(options, dict): + return dict(options) + return dict(options.to_map()) + + +def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]: + # Every FileIO exposes catalog properties via ``properties`` (CachingFileIO + # delegates to its wrapped FileIO), so no per-implementation handling needed. + return _options_to_dict(table.file_io.properties) + + +def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None: + identifier = table.identifier + if identifier is None: + return None + + database_name = identifier.get_database_name() + table_name = identifier.get_table_name() + if database_name is None or table_name is None: + return None + return database_name, table_name, identifier.get_branch_name() + + +def _extract_table_options(table: FileStoreTable) -> dict[str, Any]: + return _options_to_dict(table.schema().options) + + +def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any: + database_name, table_name, branch_name = identifier + if branch_name: + from pypaimon.common.identifier import Identifier + + return Identifier(database_name, table_name, branch_name) + return f"{database_name}.{table_name}" + + +def _load_table( + catalog_options: dict[str, Any], + table_identifier: _PaimonIdentifier | None, + table_path: str | None, + table_options: dict[str, Any], +) -> FileStoreTable: + if catalog_options and table_identifier is not None: + from pypaimon.catalog.catalog_factory import CatalogFactory + + catalog = CatalogFactory.create(catalog_options) + table = catalog.get_table(_to_paimon_identifier(table_identifier)) + elif table_path: + from pypaimon.table.file_store_table import FileStoreTable + + table = FileStoreTable.from_path(table_path) + else: + raise RuntimeError( + "Unable to reconstruct Paimon table while deserializing PaimonDataSource." + ) + + if table_options: + table = table.copy(table_options) + return table + + +def _build_storage_config( + catalog_options: dict[str, Any], + multithreaded_io: bool, +) -> StorageConfig: + from daft import context + from daft.daft import StorageConfig + + from pypaimon.daft.daft_io_config import _convert_paimon_catalog_options_to_io_config + + io_config = _convert_paimon_catalog_options_to_io_config(catalog_options) + io_config = io_config or context.get_context().daft_planning_config.default_io_config + return StorageConfig(multithreaded_io, io_config) + + class _PaimonPKSplitTask(DataSourceTask): """DataSourceTask for PK-table splits that require LSM-tree merge. @@ -73,15 +152,27 @@ class _PaimonPKSplitTask(DataSourceTask): def __init__( self, - table_read: TableRead, + table_catalog_options: dict[str, Any], + table_identifier: _PaimonIdentifier | None, + table_path: str | None, + table_options: dict[str, Any], split: Split, schema: Schema, + read_columns: list[str] | None = None, + limit: int | None = None, + predicate: Predicate | None = None, output_columns: list[str] | None = None, blob_column_names: set[str] | None = None, ) -> None: - self._table_read = table_read + self._table_catalog_options = table_catalog_options + self._table_identifier = table_identifier + self._table_path = table_path + self._table_options = table_options self._split = split self._schema = schema + self._read_columns = read_columns + self._limit = limit + self._predicate = predicate self._output_columns = output_columns self._blob_column_names = blob_column_names or set() @@ -90,7 +181,21 @@ def schema(self) -> Schema: return self._schema async def read(self) -> AsyncIterator[RecordBatch]: - reader = self._table_read.to_arrow_batch_reader([self._split]) + table = _load_table( + self._table_catalog_options, + self._table_identifier, + self._table_path, + self._table_options, + ) + read_builder = table.new_read_builder() + if self._read_columns is not None: + read_builder = read_builder.with_projection(self._read_columns) + if self._limit is not None: + read_builder = read_builder.with_limit(self._limit) + if self._predicate is not None: + read_builder = read_builder.with_filter(self._predicate) + + reader = read_builder.new_read().to_arrow_batch_reader([self._split]) for batch in iter(reader.read_next_batch, None): if self._output_columns is not None: batch = batch.select(self._output_columns) @@ -151,9 +256,55 @@ def __init__( storage_config: StorageConfig, catalog_options: dict[str, str], ) -> None: - self._table = table self._storage_config = storage_config - self._catalog_options = catalog_options + self._catalog_options = dict(catalog_options or {}) + self._table_catalog_options = { + **_extract_catalog_options(table), + **self._catalog_options, + } + self._table_identifier = _extract_identifier(table) + table_path = getattr(table, "table_path", None) + self._table_path = str(table_path) if table_path is not None else None + self._table_options = _extract_table_options(table) + self._paimon_predicate: Predicate | None = None + self._remaining_filters: list[PyExpr] | None = None + self._init_table(table) + + def __getstate__(self) -> dict[str, Any]: + return { + "_multithreaded_io": self._storage_config.multithreaded_io, + "_catalog_options": self._catalog_options, + "_table_catalog_options": self._table_catalog_options, + "_table_identifier": self._table_identifier, + "_table_path": self._table_path, + "_table_options": self._table_options, + "_paimon_predicate": self._paimon_predicate, + "_remaining_filters": self._remaining_filters, + } + + def __setstate__(self, state: dict[str, Any]) -> None: + self._catalog_options = state["_catalog_options"] + self._table_catalog_options = state["_table_catalog_options"] + self._table_identifier = state["_table_identifier"] + self._table_path = state["_table_path"] + self._table_options = state["_table_options"] + self._paimon_predicate = state["_paimon_predicate"] + self._remaining_filters = state["_remaining_filters"] + self._storage_config = _build_storage_config( + self._table_catalog_options, + state["_multithreaded_io"], + ) + + table = _load_table( + self._table_catalog_options, + self._table_identifier, + self._table_path, + self._table_options, + ) + self._init_table(table) + + def _init_table(self, table: FileStoreTable) -> None: + self._table = table from pypaimon.schema.data_types import PyarrowFieldParser @@ -177,7 +328,11 @@ def __init__( else: self._schema = Schema.from_pyarrow_schema(pa_schema) - warehouse = catalog_options.get("warehouse", "") + warehouse = ( + self._catalog_options.get("warehouse") + or self._table_catalog_options.get("warehouse") + or "" + ) self._warehouse_scheme = urlparse(warehouse).scheme self._file_format = table.options.file_format().lower() @@ -189,9 +344,6 @@ def __init__( else {} ) - self._paimon_predicate: Predicate | None = None - self._remaining_filters: list[PyExpr] | None = None - @property def name(self) -> str: table_path = getattr(self._table, "table_path", None) @@ -270,6 +422,22 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] _deletion_files = getattr(split, "data_deletion_files", None) has_deletion_vectors = _deletion_files is not None and any(df is not None for df in _deletion_files) + # Reader selection per split. + # + # Daft's native Parquet reader (the fast path) can read a split only + # when ALL of the following hold: + # * files are Parquet (ORC/Avro are not handled natively); + # * no blob columns (these need pypaimon's File assembly); + # * no LSM merge required -- either an append-only table, or a + # primary-key split that is raw_convertible (no overlapping levels); + # * no deletion vectors on the split (must be applied by pypaimon). + # + # Otherwise the split is read by the pypaimon reader task + # (_PaimonPKSplitTask). Both the source and that task serialize only + # rebuildable metadata (catalog options, identifier, table path), so + # either path is safe under the Ray runner on remote filesystems + # (OSS/Jindo): the native task carries Daft's own picklable + # StorageConfig, and the pypaimon task reopens the table on the worker. can_use_native_reader = ( self._is_parquet and not self._has_blob_columns @@ -311,14 +479,15 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] reason, ) yield _PaimonPKSplitTask( - self._fallback_read_builder( - read_table, - read_pushdowns.read_columns, - read_pushdowns.source_limit, - read_pushdowns.reader_predicate, - ).new_read(), + self._table_catalog_options, + self._table_identifier, + self._table_path, + _extract_table_options(read_table), split, self._project_schema(read_pushdowns.task_columns), + read_pushdowns.read_columns, + read_pushdowns.source_limit, + read_pushdowns.reader_predicate, read_pushdowns.task_columns, self._blob_column_names, ) @@ -412,22 +581,6 @@ def _project_schema(self, columns: list[str] | None) -> Schema: [(name, field_map[name].dtype) for name in columns if name in field_map] ) - def _fallback_read_builder( - self, - table: FileStoreTable, - read_columns: list[str] | None, - limit: int | None, - predicate: Predicate | None, - ) -> Any: - read_builder = table.new_read_builder() - if read_columns is not None: - read_builder = read_builder.with_projection(read_columns) - if limit is not None: - read_builder = read_builder.with_limit(limit) - if predicate is not None: - read_builder = read_builder.with_filter(predicate) - return read_builder - def _read_pushdown_state( self, table: FileStoreTable, diff --git a/paimon-python/pypaimon/filesystem/caching_file_io.py b/paimon-python/pypaimon/filesystem/caching_file_io.py index 10605be5f93d..fe8053421957 100644 --- a/paimon-python/pypaimon/filesystem/caching_file_io.py +++ b/paimon-python/pypaimon/filesystem/caching_file_io.py @@ -356,6 +356,10 @@ def wrap_with_caching_if_needed(file_io, options, cache=None): return file_io return CachingFileIO(file_io, cache, whitelist) + @property + def properties(self): + return self._delegate.properties + def new_input_stream(self, path: str): file_type = FileType.classify(path) if self._cache is None or file_type not in self._whitelist or FileType.is_mutable(path): diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 67be2587b7e8..9d418cbb26bb 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -81,6 +81,10 @@ def from_path(cls, table_path: str) -> 'FileStoreTable': return cls(file_io, identifier, table_path, table_schema) + def schema(self) -> TableSchema: + """Get the table schema.""" + return self.table_schema + def current_branch(self) -> str: """Get the current branch name from the identifier.""" return self.identifier.get_branch_name_or_default() diff --git a/paimon-python/pypaimon/tests/daft/daft_data_test.py b/paimon-python/pypaimon/tests/daft/daft_data_test.py index 9d7795cb9797..2013e1c123cc 100644 --- a/paimon-python/pypaimon/tests/daft/daft_data_test.py +++ b/paimon-python/pypaimon/tests/daft/daft_data_test.py @@ -44,6 +44,18 @@ # --------------------------------------------------------------------------- +class _UnserializableFileIoMarker: + def __reduce__(self): + raise TypeError("file io marker should not be serialized") + + +class _UnserializableStorageConfig: + multithreaded_io = False + + def __reduce__(self): + raise TypeError("storage config marker should not be serialized") + + def _write_to_paimon(table, arrow_table, mode="append", overwrite_partition=None): write_builder = table.new_batch_write_builder() if mode == "overwrite": @@ -191,6 +203,104 @@ def test_read_paimon_schema_matches(append_only_table): assert "dt" in schema.column_names() +def test_read_paimon_source_is_serializable(append_only_table): + """The Daft source must not serialize live table/file_io/storage objects.""" + from daft.pickle import dumps, loads + + from pypaimon.daft.daft_datasource import PaimonDataSource + + table, _ = append_only_table + table.file_io._unserializable_marker = _UnserializableFileIoMarker() + + source = PaimonDataSource( + table, + storage_config=_UnserializableStorageConfig(), + catalog_options={}, + ) + + restored = loads(dumps(source)) + + assert restored is not source + assert restored.schema.column_names() == source.schema.column_names() + assert restored._table is not table + assert restored._table.identifier.get_full_name() == table.identifier.get_full_name() + assert restored._storage_config.multithreaded_io is False + + +def test_read_paimon_remote_ray_task_is_serializable(pk_table, monkeypatch): + """A fallback PK split task must reopen the table from metadata on Ray workers. + + Splits that need an LSM merge (here, overlapping primary-key writes) are read + by the pypaimon reader task. Under the Ray runner that task is pickled to + remote workers, so it must serialize only rebuildable metadata -- never the + live table / file_io / storage objects. + """ + from daft import runners + from daft.io.pushdowns import Pushdowns + from daft.pickle import dumps, loads + + from pypaimon.daft.daft_datasource import PaimonDataSource + + class _RayRunner: + name = "ray" + + table, _ = pk_table + # Two overlapping writes on id=1 create non-raw-convertible splits that + # require the pypaimon merge reader (the fallback _PaimonPKSplitTask). + _write_to_paimon( + table, + pa.table( + { + "id": pa.array([1, 2], pa.int64()), + "name": pa.array(["old_a", "old_b"], pa.string()), + "dt": pa.array(["2024-01-01", "2024-01-01"], pa.string()), + } + ), + ) + _write_to_paimon( + table, + pa.table( + { + "id": pa.array([1], pa.int64()), + "name": pa.array(["new_a"], pa.string()), + "dt": pa.array(["2024-01-01"], pa.string()), + } + ), + ) + table.file_io._unserializable_marker = _UnserializableFileIoMarker() + + source = PaimonDataSource( + table, + storage_config=_UnserializableStorageConfig(), + catalog_options={}, + ) + monkeypatch.setattr(runners, "get_or_create_runner", lambda: _RayRunner()) + + async def first_task(): + async for task in source.get_tasks(Pushdowns()): + return task + raise AssertionError("Expected at least one task") + + async def read_task(task): + rows = [] + async for batch in task.read(): + rows.append(batch.to_pydict()) + return rows + + task = asyncio.run(first_task()) + assert type(task).__name__ == "_PaimonPKSplitTask" + + restored_task = loads(dumps(task)) + batches = asyncio.run(read_task(restored_task)) + + merged = { + _id: name + for batch in batches + for _id, name in zip(batch["id"], batch["name"]) + } + assert merged == {1: "new_a", 2: "old_b"} + + # --------------------------------------------------------------------------- # Multi-partition reads # ---------------------------------------------------------------------------