Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 185 additions & 32 deletions paimon-python/pypaimon/daft/daft_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.table_read import TableRead
from pypaimon.read.split import Split
from pypaimon.table.file_store_table import FileStoreTable

Expand All @@ -52,6 +51,8 @@
PAIMON_FILE_FORMAT_ORC = "orc"
PAIMON_FILE_FORMAT_AVRO = "avro"

_PaimonIdentifier = tuple[str, str, str | None]


@dataclass(frozen=True, slots=True)
class _ReadPushdownState:
Expand All @@ -63,6 +64,84 @@ class _ReadPushdownState:
source_limit: int | None


def _options_to_dict(options: Any) -> dict[str, Any]:
if options is None:
return {}
if isinstance(options, dict):
return dict(options)
return dict(options.to_map())


def _extract_catalog_options(table: FileStoreTable) -> dict[str, Any]:
# Every FileIO exposes catalog properties via ``properties`` (CachingFileIO
# delegates to its wrapped FileIO), so no per-implementation handling needed.
return _options_to_dict(table.file_io.properties)


def _extract_identifier(table: FileStoreTable) -> _PaimonIdentifier | None:
identifier = table.identifier
if identifier is None:
return None

database_name = identifier.get_database_name()
table_name = identifier.get_table_name()
if database_name is None or table_name is None:
return None
return database_name, table_name, identifier.get_branch_name()


def _extract_table_options(table: FileStoreTable) -> dict[str, Any]:
return _options_to_dict(table.schema().options)


def _to_paimon_identifier(identifier: _PaimonIdentifier) -> Any:
database_name, table_name, branch_name = identifier
if branch_name:
from pypaimon.common.identifier import Identifier

return Identifier(database_name, table_name, branch_name)
return f"{database_name}.{table_name}"


def _load_table(
catalog_options: dict[str, Any],
table_identifier: _PaimonIdentifier | None,
table_path: str | None,
table_options: dict[str, Any],
) -> FileStoreTable:
if catalog_options and table_identifier is not None:
from pypaimon.catalog.catalog_factory import CatalogFactory

catalog = CatalogFactory.create(catalog_options)
table = catalog.get_table(_to_paimon_identifier(table_identifier))
elif table_path:
from pypaimon.table.file_store_table import FileStoreTable

table = FileStoreTable.from_path(table_path)
else:
raise RuntimeError(
"Unable to reconstruct Paimon table while deserializing PaimonDataSource."
)

if table_options:
table = table.copy(table_options)
return table


def _build_storage_config(
catalog_options: dict[str, Any],
multithreaded_io: bool,
) -> StorageConfig:
from daft import context
from daft.daft import StorageConfig

from pypaimon.daft.daft_io_config import _convert_paimon_catalog_options_to_io_config

io_config = _convert_paimon_catalog_options_to_io_config(catalog_options)
io_config = io_config or context.get_context().daft_planning_config.default_io_config
return StorageConfig(multithreaded_io, io_config)


class _PaimonPKSplitTask(DataSourceTask):
"""DataSourceTask for PK-table splits that require LSM-tree merge.

Expand All @@ -73,15 +152,27 @@ class _PaimonPKSplitTask(DataSourceTask):

def __init__(
self,
table_read: TableRead,
table_catalog_options: dict[str, Any],
table_identifier: _PaimonIdentifier | None,
table_path: str | None,
table_options: dict[str, Any],
split: Split,
schema: Schema,
read_columns: list[str] | None = None,
limit: int | None = None,
predicate: Predicate | None = None,
output_columns: list[str] | None = None,
blob_column_names: set[str] | None = None,
) -> None:
self._table_read = table_read
self._table_catalog_options = table_catalog_options
self._table_identifier = table_identifier
self._table_path = table_path
self._table_options = table_options
self._split = split
self._schema = schema
self._read_columns = read_columns
self._limit = limit
self._predicate = predicate
self._output_columns = output_columns
self._blob_column_names = blob_column_names or set()

Expand All @@ -90,7 +181,21 @@ def schema(self) -> Schema:
return self._schema

async def read(self) -> AsyncIterator[RecordBatch]:
reader = self._table_read.to_arrow_batch_reader([self._split])
table = _load_table(
self._table_catalog_options,
self._table_identifier,
self._table_path,
self._table_options,
)
read_builder = table.new_read_builder()
if self._read_columns is not None:
read_builder = read_builder.with_projection(self._read_columns)
if self._limit is not None:
read_builder = read_builder.with_limit(self._limit)
if self._predicate is not None:
read_builder = read_builder.with_filter(self._predicate)

reader = read_builder.new_read().to_arrow_batch_reader([self._split])
for batch in iter(reader.read_next_batch, None):
if self._output_columns is not None:
batch = batch.select(self._output_columns)
Expand Down Expand Up @@ -151,9 +256,55 @@ def __init__(
storage_config: StorageConfig,
catalog_options: dict[str, str],
) -> None:
self._table = table
self._storage_config = storage_config
self._catalog_options = catalog_options
self._catalog_options = dict(catalog_options or {})
self._table_catalog_options = {
**_extract_catalog_options(table),
**self._catalog_options,
}
self._table_identifier = _extract_identifier(table)
table_path = getattr(table, "table_path", None)
self._table_path = str(table_path) if table_path is not None else None
self._table_options = _extract_table_options(table)
self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None
self._init_table(table)

def __getstate__(self) -> dict[str, Any]:
return {
"_multithreaded_io": self._storage_config.multithreaded_io,
"_catalog_options": self._catalog_options,
"_table_catalog_options": self._table_catalog_options,
"_table_identifier": self._table_identifier,
"_table_path": self._table_path,
"_table_options": self._table_options,
"_paimon_predicate": self._paimon_predicate,
"_remaining_filters": self._remaining_filters,
}

def __setstate__(self, state: dict[str, Any]) -> None:
self._catalog_options = state["_catalog_options"]
self._table_catalog_options = state["_table_catalog_options"]
self._table_identifier = state["_table_identifier"]
self._table_path = state["_table_path"]
self._table_options = state["_table_options"]
self._paimon_predicate = state["_paimon_predicate"]
self._remaining_filters = state["_remaining_filters"]
self._storage_config = _build_storage_config(
self._table_catalog_options,
state["_multithreaded_io"],
)

table = _load_table(
self._table_catalog_options,
self._table_identifier,
self._table_path,
self._table_options,
)
self._init_table(table)

def _init_table(self, table: FileStoreTable) -> None:
self._table = table

from pypaimon.schema.data_types import PyarrowFieldParser

Expand All @@ -177,7 +328,11 @@ def __init__(
else:
self._schema = Schema.from_pyarrow_schema(pa_schema)

warehouse = catalog_options.get("warehouse", "")
warehouse = (
self._catalog_options.get("warehouse")
or self._table_catalog_options.get("warehouse")
or ""
)
self._warehouse_scheme = urlparse(warehouse).scheme

self._file_format = table.options.file_format().lower()
Expand All @@ -189,9 +344,6 @@ def __init__(
else {}
)

self._paimon_predicate: Predicate | None = None
self._remaining_filters: list[PyExpr] | None = None

@property
def name(self) -> str:
table_path = getattr(self._table, "table_path", None)
Expand Down Expand Up @@ -270,6 +422,22 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]
_deletion_files = getattr(split, "data_deletion_files", None)
has_deletion_vectors = _deletion_files is not None and any(df is not None for df in _deletion_files)

# Reader selection per split.
#
# Daft's native Parquet reader (the fast path) can read a split only
# when ALL of the following hold:
# * files are Parquet (ORC/Avro are not handled natively);
# * no blob columns (these need pypaimon's File assembly);
# * no LSM merge required -- either an append-only table, or a
# primary-key split that is raw_convertible (no overlapping levels);
# * no deletion vectors on the split (must be applied by pypaimon).
#
# Otherwise the split is read by the pypaimon reader task
# (_PaimonPKSplitTask). Both the source and that task serialize only
# rebuildable metadata (catalog options, identifier, table path), so
# either path is safe under the Ray runner on remote filesystems
# (OSS/Jindo): the native task carries Daft's own picklable
# StorageConfig, and the pypaimon task reopens the table on the worker.
can_use_native_reader = (
self._is_parquet
and not self._has_blob_columns
Expand Down Expand Up @@ -311,14 +479,15 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]
reason,
)
yield _PaimonPKSplitTask(
self._fallback_read_builder(
read_table,
read_pushdowns.read_columns,
read_pushdowns.source_limit,
read_pushdowns.reader_predicate,
).new_read(),
self._table_catalog_options,
self._table_identifier,
self._table_path,
_extract_table_options(read_table),
split,
self._project_schema(read_pushdowns.task_columns),
read_pushdowns.read_columns,
read_pushdowns.source_limit,
read_pushdowns.reader_predicate,
read_pushdowns.task_columns,
self._blob_column_names,
)
Expand Down Expand Up @@ -412,22 +581,6 @@ def _project_schema(self, columns: list[str] | None) -> Schema:
[(name, field_map[name].dtype) for name in columns if name in field_map]
)

def _fallback_read_builder(
self,
table: FileStoreTable,
read_columns: list[str] | None,
limit: int | None,
predicate: Predicate | None,
) -> Any:
read_builder = table.new_read_builder()
if read_columns is not None:
read_builder = read_builder.with_projection(read_columns)
if limit is not None:
read_builder = read_builder.with_limit(limit)
if predicate is not None:
read_builder = read_builder.with_filter(predicate)
return read_builder

def _read_pushdown_state(
self,
table: FileStoreTable,
Expand Down
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/filesystem/caching_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ def wrap_with_caching_if_needed(file_io, options, cache=None):
return file_io
return CachingFileIO(file_io, cache, whitelist)

@property
def properties(self):
return self._delegate.properties

def new_input_stream(self, path: str):
file_type = FileType.classify(path)
if self._cache is None or file_type not in self._whitelist or FileType.is_mutable(path):
Expand Down
4 changes: 4 additions & 0 deletions paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def from_path(cls, table_path: str) -> 'FileStoreTable':

return cls(file_io, identifier, table_path, table_schema)

def schema(self) -> TableSchema:
"""Get the table schema."""
return self.table_schema

def current_branch(self) -> str:
"""Get the current branch name from the identifier."""
return self.identifier.get_branch_name_or_default()
Expand Down
Loading
Loading