diff --git a/paimon-python/pypaimon/daft/__init__.py b/paimon-python/pypaimon/daft/__init__.py index e9651dd47744..b854830173d8 100644 --- a/paimon-python/pypaimon/daft/__init__.py +++ b/paimon-python/pypaimon/daft/__init__.py @@ -16,9 +16,9 @@ # limitations under the License. ################################################################################ -from pypaimon.daft.daft_paimon import read_paimon, write_paimon +from pypaimon.daft.daft_paimon import explain_paimon_scan, read_paimon, write_paimon -__all__ = ["read_paimon", "write_paimon", "PaimonCatalog", "PaimonTable"] +__all__ = ["explain_paimon_scan", "read_paimon", "write_paimon", "PaimonCatalog", "PaimonTable"] def __getattr__(name): diff --git a/paimon-python/pypaimon/daft/daft_catalog.py b/paimon-python/pypaimon/daft/daft_catalog.py index aaf3f6879d8a..97adbcc33776 100644 --- a/paimon-python/pypaimon/daft/daft_catalog.py +++ b/paimon-python/pypaimon/daft/daft_catalog.py @@ -216,6 +216,29 @@ def read(self, **options: Any) -> DataFrame: Table._validate_options("Paimon read", options, set()) return _read_table(self._inner, catalog_options=self._catalog_options) + def explain_scan( + self, + *, + filters: Any = None, + partition_filters: Any = None, + columns: list[str] | None = None, + limit: int | None = None, + io_config=None, + verbose: bool = False, + ) -> Any: + from pypaimon.daft.daft_paimon import _explain_table + + return _explain_table( + self._inner, + catalog_options=self._catalog_options, + filters=filters, + partition_filters=partition_filters, + columns=columns, + limit=limit, + io_config=io_config, + verbose=verbose, + ) + def append(self, df: DataFrame, **options: Any) -> None: from pypaimon.daft.daft_paimon import _write_table diff --git a/paimon-python/pypaimon/daft/daft_datasource.py b/paimon-python/pypaimon/daft/daft_datasource.py index 457fae375c57..7e4419d2ab75 100644 --- a/paimon-python/pypaimon/daft/daft_datasource.py +++ b/paimon-python/pypaimon/daft/daft_datasource.py @@ -18,7 +18,7 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, replace import logging from typing import TYPE_CHECKING, Any from urllib.parse import urlparse @@ -32,6 +32,12 @@ from daft.recordbatch import RecordBatch from pypaimon.daft.daft_compat import require_file_range_reads +from pypaimon.daft.daft_explain import ( + PaimonReaderSplitExplain, + PaimonScanExplain, + READER_MODE_NATIVE_PARQUET, + READER_MODE_PYPAIMON_FALLBACK, +) from pypaimon.daft.daft_predicate_visitor import convert_filters_to_paimon if TYPE_CHECKING: @@ -39,6 +45,7 @@ from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.data_file_meta import DataFileMeta + from pypaimon.read.explain import ExplainSplitInfo from pypaimon.read.table_read import TableRead from pypaimon.read.split import Split from pypaimon.table.file_store_table import FileStoreTable @@ -63,6 +70,16 @@ class _ReadPushdownState: source_limit: int | None +@dataclass(frozen=True, slots=True) +class _ReaderRouting: + reader_mode: str + fallback_reason: str | None + + @property + def use_native_reader(self) -> bool: + return self.reader_mode == READER_MODE_NATIVE_PARQUET + + class _PaimonPKSplitTask(DataSourceTask): """DataSourceTask for PK-table splits that require LSM-tree merge. @@ -189,6 +206,7 @@ def __init__( else {} ) + self._pushed_filters: list[PyExpr] | None = None self._paimon_predicate: Predicate | None = None self._remaining_filters: list[PyExpr] | None = None @@ -213,6 +231,7 @@ def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], list[PyExpr """ pushed_filters, remaining_filters, paimon_predicate = convert_filters_to_paimon(self._table, filters) + self._pushed_filters = pushed_filters self._paimon_predicate = paimon_predicate self._remaining_filters = remaining_filters @@ -225,13 +244,17 @@ def push_filters(self, filters: list[PyExpr]) -> tuple[list[PyExpr], list[PyExpr return pushed_filters, remaining_filters - async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]: - read_table = self._table + def _read_table_for_scan(self) -> FileStoreTable: if self._has_blob_columns: - read_table = self._table.copy({"blob-as-descriptor": "true"}) + return self._table.copy({"blob-as-descriptor": "true"}) + return self._table - read_builder = read_table.new_read_builder() - read_pushdowns = self._read_pushdown_state(read_table, pushdowns) + def _scan_read_builder( + self, + table: FileStoreTable, + read_pushdowns: _ReadPushdownState, + ) -> Any: + read_builder = table.new_read_builder() if read_pushdowns.requested_columns is not None: read_builder = read_builder.with_projection(read_pushdowns.requested_columns) @@ -246,6 +269,13 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] read_pushdowns.planning_predicate, ) + return read_builder + + async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask]: + read_table = self._read_table_for_scan() + read_pushdowns = self._read_pushdown_state(read_table, pushdowns) + read_builder = self._scan_read_builder(read_table, read_pushdowns) + if self._table.partition_keys and pushdowns.partition_filters is None: logger.warning( "%s has partition keys %s but no partition filter was specified. " @@ -256,34 +286,21 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] plan = read_builder.new_scan().plan() - pv_cache: dict[tuple[Any, ...], RecordBatch | None] = {} + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None] = {} for split in plan.splits(): - if self._table.partition_keys and pushdowns.partition_filters is not None: - pv_key = tuple(sorted(split.partition.to_dict().items())) - if pv_key not in pv_cache: - pv_cache[pv_key] = self._build_partition_values(split) - pv = pv_cache[pv_key] - if pv is not None and len(pv.filter(ExpressionsProjection([pushdowns.partition_filters]))) == 0: - continue - - _deletion_files = getattr(split, "data_deletion_files", None) - has_deletion_vectors = _deletion_files is not None and any(df is not None for df in _deletion_files) - - can_use_native_reader = ( - self._is_parquet - and not self._has_blob_columns - and (not self._table.is_primary_key_table or split.raw_convertible) - and not has_deletion_vectors + if self._partition_filter_skips_split(split, pushdowns, pv_cache): + continue + + routing = self._reader_routing( + raw_convertible=split.raw_convertible, + has_deletion_vectors=self._split_has_deletion_vectors(split), ) - if can_use_native_reader: + if routing.use_native_reader: pv = None if self._table.partition_keys: - pv_key = tuple(sorted(split.partition.to_dict().items())) - if pv_key not in pv_cache: - pv_cache[pv_key] = self._build_partition_values(split) - pv = pv_cache[pv_key] + pv = self._partition_values(split, pv_cache) for data_file in split.files: file_uri = self._build_file_uri(self._data_file_path(data_file)) @@ -297,18 +314,10 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] storage_config=self._storage_config, ) else: - if not self._is_parquet: - reason = "non-parquet format" - elif self._has_blob_columns: - reason = "blob columns present" - elif has_deletion_vectors: - reason = "deletion vectors present" - else: - reason = "LSM merge required" logger.debug( "Split with %d files using pypaimon fallback (%s).", len(split.files), - reason, + routing.fallback_reason, ) yield _PaimonPKSplitTask( self._fallback_read_builder( @@ -323,6 +332,168 @@ async def get_tasks(self, pushdowns: Pushdowns) -> AsyncIterator[DataSourceTask] self._blob_column_names, ) + def explain_scan(self, pushdowns: Pushdowns, verbose: bool = False) -> PaimonScanExplain: + read_table = self._read_table_for_scan() + read_pushdowns = self._read_pushdown_state(read_table, pushdowns) + read_builder = self._scan_read_builder(read_table, read_pushdowns) + + paimon_scan = read_builder.explain(verbose=True) + split_details = paimon_scan.splits or [] + + native_split_count = 0 + native_file_count = 0 + fallback_split_count = 0 + fallback_file_count = 0 + fallback_reasons: dict[str, int] = {} + explained_splits: list[PaimonReaderSplitExplain] | None = [] if verbose else None + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None] = {} + + for split in split_details: + if self._partition_filter_skips_explain_split(split, pushdowns, pv_cache): + continue + + routing = self._reader_routing( + raw_convertible=split.raw_convertible, + has_deletion_vectors=split.has_deletion_vectors, + ) + if routing.use_native_reader: + native_split_count += 1 + native_file_count += split.file_count + else: + fallback_split_count += 1 + fallback_file_count += split.file_count + reason = routing.fallback_reason or "unknown" + fallback_reasons[reason] = fallback_reasons.get(reason, 0) + 1 + + if explained_splits is not None: + explained_splits.append( + PaimonReaderSplitExplain( + partition=split.partition, + bucket=split.bucket, + file_count=split.file_count, + row_count=split.row_count, + file_size=split.file_size, + reader_mode=routing.reader_mode, + fallback_reason=routing.fallback_reason, + file_paths=split.file_paths, + ) + ) + + if not verbose: + paimon_scan = replace(paimon_scan, splits=None) + + pushed_filters, remaining_filters = self._filter_pushdown_explain(pushdowns) + return PaimonScanExplain( + paimon_scan=paimon_scan, + native_parquet_split_count=native_split_count, + native_parquet_file_count=native_file_count, + pypaimon_fallback_split_count=fallback_split_count, + pypaimon_fallback_file_count=fallback_file_count, + fallback_reasons=fallback_reasons, + pushed_filters=pushed_filters, + remaining_filters=remaining_filters, + partition_filters=self._format_partition_filters(pushdowns), + requested_columns=read_pushdowns.requested_columns, + task_columns=read_pushdowns.task_columns, + fallback_read_columns=read_pushdowns.read_columns, + requested_limit=pushdowns.limit, + source_limit=read_pushdowns.source_limit, + limit_pushed=pushdowns.limit is not None and read_pushdowns.source_limit == pushdowns.limit, + splits=explained_splits, + ) + + def _reader_routing( + self, + raw_convertible: bool, + has_deletion_vectors: bool, + ) -> _ReaderRouting: + can_use_native_reader = ( + self._is_parquet + and not self._has_blob_columns + and (not self._table.is_primary_key_table or raw_convertible) + and not has_deletion_vectors + ) + if can_use_native_reader: + return _ReaderRouting(READER_MODE_NATIVE_PARQUET, None) + + if not self._is_parquet: + reason = "non-parquet format" + elif self._has_blob_columns: + reason = "blob columns present" + elif has_deletion_vectors: + reason = "deletion vectors present" + else: + reason = "LSM merge required" + return _ReaderRouting(READER_MODE_PYPAIMON_FALLBACK, reason) + + @staticmethod + def _split_has_deletion_vectors(split: Split) -> bool: + deletion_files = getattr(split, "data_deletion_files", None) + return deletion_files is not None and any(df is not None for df in deletion_files) + + def _partition_filter_skips_split( + self, + split: Split, + pushdowns: Pushdowns, + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None], + ) -> bool: + if not self._table.partition_keys or pushdowns.partition_filters is None: + return False + pv = self._partition_values(split, pv_cache) + return self._partition_filter_skips_values(pv, pushdowns) + + def _partition_filter_skips_explain_split( + self, + split: ExplainSplitInfo, + pushdowns: Pushdowns, + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None], + ) -> bool: + if not self._table.partition_keys or pushdowns.partition_filters is None: + return False + pv = self._partition_values_from_dict(split.partition, pv_cache) + return self._partition_filter_skips_values(pv, pushdowns) + + @staticmethod + def _partition_filter_skips_values( + partition_values: RecordBatch | None, + pushdowns: Pushdowns, + ) -> bool: + return ( + partition_values is not None + and len(partition_values.filter(ExpressionsProjection([pushdowns.partition_filters]))) == 0 + ) + + def _format_partition_filters(self, pushdowns: Pushdowns) -> list[str]: + if pushdowns.partition_filters is None: + return [] + return self._format_pyexprs([getattr(pushdowns.partition_filters, "_expr", pushdowns.partition_filters)]) + + def _filter_pushdown_explain(self, pushdowns: Pushdowns) -> tuple[list[str], list[str]]: + if self._remaining_filters is not None: + return ( + self._format_pyexprs(self._pushed_filters or []), + self._format_pyexprs(self._remaining_filters), + ) + + if pushdowns.filters is None: + return [], [] + + py_expr = getattr(pushdowns.filters, "_expr", pushdowns.filters) + pushed_filters, remaining_filters, _ = convert_filters_to_paimon(self._table, [py_expr]) + return self._format_pyexprs(pushed_filters), self._format_pyexprs(remaining_filters) + + @staticmethod + def _format_pyexprs(py_exprs: list[PyExpr]) -> list[str]: + from daft.expressions import Expression + + result = [] + for py_expr in py_exprs: + try: + result.append(str(Expression._from_pyexpr(py_expr))) + except Exception: + result.append(str(py_expr)) + return result + def _build_file_uri(self, file_path: str) -> str: """Reconstruct a full URI from a (potentially scheme-stripped) file_path.""" if urlparse(file_path).scheme: @@ -337,10 +508,29 @@ def _data_file_path(data_file: DataFileMeta) -> str: def _build_partition_values(self, split: Split) -> daft.recordbatch.RecordBatch | None: """Build a single-row RecordBatch encoding the partition values for a split.""" + return self._build_partition_values_from_dict(split.partition.to_dict()) + + def _partition_values( + self, + split: Split, + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None], + ) -> RecordBatch | None: + return self._partition_values_from_dict(split.partition.to_dict(), pv_cache) + + def _partition_values_from_dict( + self, + partition_dict: dict[str, Any], + pv_cache: dict[tuple[tuple[str, Any], ...], RecordBatch | None], + ) -> RecordBatch | None: + pv_key = tuple(sorted(partition_dict.items())) + if pv_key not in pv_cache: + pv_cache[pv_key] = self._build_partition_values_from_dict(partition_dict) + return pv_cache[pv_key] + + def _build_partition_values_from_dict(self, partition_dict: dict[str, Any]) -> daft.recordbatch.RecordBatch | None: if not self._table.partition_keys: return None - partition_dict = split.partition.to_dict() arrays: dict[str, daft.Series] = {} for pfield in self._table.partition_keys_fields: value = partition_dict.get(pfield.name) diff --git a/paimon-python/pypaimon/daft/daft_explain.py b/paimon-python/pypaimon/daft/daft_explain.py new file mode 100644 index 000000000000..6c97f393aea0 --- /dev/null +++ b/paimon-python/pypaimon/daft/daft_explain.py @@ -0,0 +1,160 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Structured explain result for Daft Paimon scans.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from pypaimon.read.explain import ExplainResult + + +READER_MODE_NATIVE_PARQUET = "native_parquet" +READER_MODE_PYPAIMON_FALLBACK = "pypaimon_fallback" + + +@dataclass(frozen=True, slots=True) +class PaimonReaderSplitExplain: + partition: dict[str, Any] + bucket: int + file_count: int + row_count: int + file_size: int + reader_mode: str + fallback_reason: str | None + file_paths: list[str] + + +@dataclass(frozen=True, slots=True) +class PaimonScanExplain: + paimon_scan: ExplainResult + + native_parquet_split_count: int = 0 + native_parquet_file_count: int = 0 + pypaimon_fallback_split_count: int = 0 + pypaimon_fallback_file_count: int = 0 + fallback_reasons: dict[str, int] = field(default_factory=dict) + + pushed_filters: list[str] = field(default_factory=list) + remaining_filters: list[str] = field(default_factory=list) + partition_filters: list[str] = field(default_factory=list) + + requested_columns: list[str] | None = None + task_columns: list[str] | None = None + fallback_read_columns: list[str] | None = None + + requested_limit: int | None = None + source_limit: int | None = None + limit_pushed: bool = False + + splits: list[PaimonReaderSplitExplain] | None = None + + @property + def total_split_count(self) -> int: + return self.native_parquet_split_count + self.pypaimon_fallback_split_count + + @property + def total_file_count(self) -> int: + return self.native_parquet_file_count + self.pypaimon_fallback_file_count + + def __str__(self) -> str: + return render_daft_paimon_explain(self) + + +def render_daft_paimon_explain(result: PaimonScanExplain) -> str: + out = [] + out.append("== Daft Paimon Scan ==") + _line(out, "Native Parquet splits", _count_files( + result.native_parquet_split_count, + result.native_parquet_file_count, + )) + _line(out, "pypaimon fallback splits", _count_files( + result.pypaimon_fallback_split_count, + result.pypaimon_fallback_file_count, + )) + _line(out, "Fallback reasons", _format_reason_counts(result.fallback_reasons)) + _line(out, "Pushed filters", _format_list(result.pushed_filters)) + _line(out, "Remaining filters", _format_list(result.remaining_filters)) + _line(out, "Partition filters", _format_list(result.partition_filters)) + _line(out, "Requested columns", _format_optional_list(result.requested_columns, "")) + _line(out, "Task columns", _format_optional_list(result.task_columns, "")) + _line(out, "Fallback read columns", _format_optional_list( + result.fallback_read_columns, + "", + )) + _line(out, "Limit", _format_limit(result)) + + if result.splits is not None: + out.append("") + out.append("Splits:") + for index, split in enumerate(result.splits): + suffix = "" if split.fallback_reason is None else " ({})".format(split.fallback_reason) + out.append( + " #{} bucket={} files={} rows={} size={} mode={}{}".format( + index, + split.bucket, + split.file_count, + split.row_count, + split.file_size, + split.reader_mode, + suffix, + ) + ) + + out.append("") + out.append(str(result.paimon_scan).rstrip()) + return "\n".join(out) + + +def _line(out: list[str], key: str, value: str) -> None: + out.append("{:<28} {}".format(key + ":", value)) + + +def _count_files(split_count: int, file_count: int) -> str: + return "{} ({} files)".format(split_count, file_count) + + +def _format_reason_counts(reasons: dict[str, int]) -> str: + if not reasons: + return "" + return ", ".join("{}: {}".format(reason, count) for reason, count in sorted(reasons.items())) + + +def _format_list(values: list[str]) -> str: + if not values: + return "" + return ", ".join(values) + + +def _format_optional_list(values: list[str] | None, empty: str) -> str: + if values is None: + return empty + if not values: + return "[]" + return "[{}]".format(", ".join(values)) + + +def _format_limit(result: PaimonScanExplain) -> str: + if result.requested_limit is None: + return "" + pushed = "pushed" if result.limit_pushed else "not pushed" + source = "" if result.source_limit is None else str(result.source_limit) + return "requested {}, source {} ({})".format(result.requested_limit, source, pushed) diff --git a/paimon-python/pypaimon/daft/daft_paimon.py b/paimon-python/pypaimon/daft/daft_paimon.py index 245cea534b2c..29825fbc11a9 100644 --- a/paimon-python/pypaimon/daft/daft_paimon.py +++ b/paimon-python/pypaimon/daft/daft_paimon.py @@ -20,20 +20,22 @@ Usage:: - from pypaimon.daft import read_paimon, write_paimon + from pypaimon.daft import explain_paimon_scan, read_paimon, write_paimon df = read_paimon("db.table", catalog_options={"warehouse": "/path"}) + explain = explain_paimon_scan("db.table", catalog_options={"warehouse": "/path"}) write_paimon(df, "db.table", catalog_options={"warehouse": "/path"}) """ from __future__ import annotations -from typing import TYPE_CHECKING, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional from urllib.parse import urlparse if TYPE_CHECKING: import daft + from pypaimon.daft.daft_explain import PaimonScanExplain from pypaimon.table.file_store_table import FileStoreTable @@ -57,19 +59,31 @@ def _enrich_options_with_rest_token( return enriched -def _read_table( +def _time_travel_table( table: FileStoreTable, - catalog_options: Dict[str, str] | None = None, - io_config=None, snapshot_id: int | None = None, tag_name: str | None = None, -) -> "daft.DataFrame": - """Read a Paimon table object into a lazy Daft DataFrame.""" +) -> FileStoreTable: if snapshot_id is not None and tag_name is not None: raise ValueError( "snapshot_id and tag_name cannot be set at the same time" ) + travel_options: dict[str, str] = {} + if snapshot_id is not None: + travel_options["scan.snapshot-id"] = str(snapshot_id) + if tag_name is not None: + travel_options["scan.tag-name"] = tag_name + if travel_options: + return table.copy(travel_options) + return table + + +def _source_for_table( + table: FileStoreTable, + catalog_options: Dict[str, str] | None = None, + io_config=None, +): from daft import context, runners from daft.daft import StorageConfig @@ -78,14 +92,6 @@ def _read_table( _convert_paimon_catalog_options_to_io_config, ) - travel_options: dict[str, str] = {} - if snapshot_id is not None: - travel_options["scan.snapshot-id"] = str(snapshot_id) - if tag_name is not None: - travel_options["scan.tag-name"] = tag_name - if travel_options: - table = table.copy(travel_options) - if catalog_options is None: catalog_options = {} @@ -97,10 +103,71 @@ def _read_table( multithreaded_io = runners.get_or_create_runner().name != "ray" storage_config = StorageConfig(multithreaded_io, io_config) - source = PaimonDataSource( + return PaimonDataSource( table, storage_config=storage_config, catalog_options=catalog_options ) - return source.read() + + +def _read_table( + table: FileStoreTable, + catalog_options: Dict[str, str] | None = None, + io_config=None, + snapshot_id: int | None = None, + tag_name: str | None = None, +) -> "daft.DataFrame": + """Read a Paimon table object into a lazy Daft DataFrame.""" + table = _time_travel_table(table, snapshot_id=snapshot_id, tag_name=tag_name) + return _source_for_table(table, catalog_options=catalog_options, io_config=io_config).read() + + +def _normalize_explain_filters(filters: Any) -> tuple[Any, list[Any]]: + if filters is None: + return None, [] + + if isinstance(filters, (list, tuple)): + if not filters: + return None, [] + filter_exprs = list(filters) + combined = filter_exprs[0] + for filter_expr in filter_exprs[1:]: + combined = combined & filter_expr + else: + filter_exprs = [filters] + combined = filters + + return combined, [getattr(filter_expr, "_expr", filter_expr) for filter_expr in filter_exprs] + + +def _explain_table( + table: FileStoreTable, + catalog_options: Dict[str, str] | None = None, + io_config=None, + snapshot_id: int | None = None, + tag_name: str | None = None, + filters: Any = None, + partition_filters: Any = None, + columns: list[str] | None = None, + limit: int | None = None, + verbose: bool = False, +) -> "PaimonScanExplain": + """Explain a Paimon table object using Daft's datasource pushdown model.""" + from daft.io.pushdowns import Pushdowns + + table = _time_travel_table(table, snapshot_id=snapshot_id, tag_name=tag_name) + source = _source_for_table(table, catalog_options=catalog_options, io_config=io_config) + filter_expr, filter_pyexprs = _normalize_explain_filters(filters) + partition_filter_expr, _ = _normalize_explain_filters(partition_filters) + if filter_pyexprs: + source.push_filters(filter_pyexprs) + return source.explain_scan( + Pushdowns( + filters=filter_expr, + partition_filters=partition_filter_expr, + columns=columns, + limit=limit, + ), + verbose=verbose, + ) def _write_table( @@ -159,6 +226,44 @@ def read_paimon( ) +def explain_paimon_scan( + table_identifier: str, + catalog_options: Dict[str, str], + *, + filters: Any = None, + partition_filters: Any = None, + columns: list[str] | None = None, + limit: int | None = None, + snapshot_id: Optional[int] = None, + tag_name: Optional[str] = None, + io_config=None, + verbose: bool = False, +) -> "PaimonScanExplain": + """Explain a Paimon scan through Daft's reader-routing layer. + + The optional ``filters`` argument accepts a Daft expression or a list of + Daft expressions. Lists are treated as conjunctions, matching how multiple + pushed filters reach Daft datasources. + """ + from pypaimon.catalog.catalog_factory import CatalogFactory + + catalog = CatalogFactory.create(catalog_options) + table = catalog.get_table(table_identifier) + + return _explain_table( + table, + catalog_options=catalog_options, + io_config=io_config, + snapshot_id=snapshot_id, + tag_name=tag_name, + filters=filters, + partition_filters=partition_filters, + columns=columns, + limit=limit, + verbose=verbose, + ) + + def write_paimon( df: "daft.DataFrame", table_identifier: str, diff --git a/paimon-python/pypaimon/tests/daft/daft_explain_test.py b/paimon-python/pypaimon/tests/daft/daft_explain_test.py new file mode 100644 index 000000000000..0df051adf9de --- /dev/null +++ b/paimon-python/pypaimon/tests/daft/daft_explain_test.py @@ -0,0 +1,368 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Tests for Daft-side Paimon scan explain diagnostics.""" + +from __future__ import annotations + +import pyarrow as pa +import pytest + +pypaimon = pytest.importorskip("pypaimon") +daft = pytest.importorskip("daft") + +from daft import col + +from pypaimon.daft import explain_paimon_scan +from pypaimon.daft.daft_catalog import PaimonTable +from pypaimon.daft.daft_explain import ( + READER_MODE_NATIVE_PARQUET, + READER_MODE_PYPAIMON_FALLBACK, +) +from pypaimon.daft.daft_compat import has_file_range_reads +from pypaimon.daft.daft_datasource import PaimonDataSource +from pypaimon.daft.daft_paimon import _explain_table +from pypaimon.read.explain import ExplainResult, ExplainSplitInfo + + +requires_blob = pytest.mark.skipif(not has_file_range_reads(), reason="BLOB support requires daft >= 0.7.11") + + +@pytest.fixture +def catalog_options(tmp_path): + options = {"warehouse": str(tmp_path)} + catalog = pypaimon.CatalogFactory.create(options) + catalog.create_database("test_db", ignore_if_exists=True) + return options + + +def _create_table( + catalog_options, + table_name: str, + pa_schema: pa.Schema, + *, + partition_keys: list[str] | None = None, + primary_keys: list[str] | None = None, + options: dict[str, str] | None = None, +): + identifier = f"test_db.{table_name}" + catalog = pypaimon.CatalogFactory.create(catalog_options) + schema = pypaimon.Schema.from_pyarrow_schema( + pa_schema, + partition_keys=partition_keys, + primary_keys=primary_keys, + options=options, + ) + catalog.create_table(identifier, schema, ignore_if_exists=False) + return identifier, catalog.get_table(identifier) + + +def _write_arrow(table, data: pa.Table) -> None: + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + try: + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + finally: + table_write.close() + table_commit.close() + + +def _single_split_explain( + *, + table_identifier: str, + raw_convertible: bool, + has_deletion_vectors: bool, +) -> ExplainResult: + split = ExplainSplitInfo( + partition={}, + bucket=0, + file_count=1, + row_count=4, + merged_row_count=None, + file_size=128, + raw_convertible=raw_convertible, + has_deletion_vectors=has_deletion_vectors, + level_histogram={0: 1}, + deletion_file_count=1 if has_deletion_vectors else 0, + file_paths=["/tmp/fake.parquet"], + ) + return ExplainResult( + table_identifier=table_identifier, + is_primary_key_table=False, + bucket_mode="unaware", + deletion_vectors_enabled=has_deletion_vectors, + data_evolution_enabled=False, + snapshot_id=1, + schema_id=0, + file_count=1, + total_file_size=split.file_size, + estimated_row_count=split.row_count, + deletion_file_count=split.deletion_file_count, + level_histogram=split.level_histogram, + split_count=1, + splits_raw_convertible=1 if raw_convertible else 0, + splits_with_deletion_vectors=1 if has_deletion_vectors else 0, + files_per_split_min=1, + files_per_split_max=1, + files_per_split_avg=1.0, + split_size_min=split.file_size, + split_size_max=split.file_size, + split_size_avg=float(split.file_size), + split_size_p50=split.file_size, + split_size_p95=split.file_size, + splits=[split], + ) + + +def test_explain_paimon_scan_reports_native_parquet_routing(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + identifier, table = _create_table( + catalog_options, + "explain_native", + pa_schema, + options={"bucket": "-1", "file.format": "parquet"}, + ) + _write_arrow(table, pa.table({"id": [1, 2, 3], "name": ["a", "b", "c"]}, schema=pa_schema)) + + result = explain_paimon_scan( + identifier, + catalog_options, + filters=col("id") == 2, + columns=["name"], + limit=1, + verbose=True, + ) + + assert result.native_parquet_split_count == result.paimon_scan.split_count + assert result.native_parquet_split_count > 0 + assert result.pypaimon_fallback_split_count == 0 + assert result.fallback_reasons == {} + assert result.requested_columns == ["name"] + assert result.requested_limit == 1 + assert result.source_limit == 1 + assert result.limit_pushed is True + assert any("id" in pushed for pushed in result.pushed_filters) + assert result.remaining_filters == [] + assert result.splits is not None + assert all(split.reader_mode == READER_MODE_NATIVE_PARQUET for split in result.splits) + assert "Daft Paimon Scan" in str(result) + assert "PyPaimon Scan Plan" in str(result) + + +def test_explain_scan_keeps_limit_above_remaining_filters(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + identifier, table = _create_table( + catalog_options, + "explain_remaining_filter", + pa_schema, + options={"bucket": "-1", "file.format": "parquet"}, + ) + _write_arrow(table, pa.table({"id": [1, 2], "name": ["a", "b"]}, schema=pa_schema)) + + result = PaimonTable(table, catalog_options=catalog_options).explain_scan( + filters=~(col("id") == 1), + limit=1, + ) + + assert result.native_parquet_split_count == result.paimon_scan.split_count + assert result.pypaimon_fallback_split_count == 0 + assert result.pushed_filters == [] + assert any("id" in remaining for remaining in result.remaining_filters) + assert result.source_limit is None + assert result.limit_pushed is False + assert result.splits is None + assert result.paimon_scan.splits is None + + +def test_explain_scan_applies_partition_filters_to_reader_counts(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ("dt", pa.string()), + ]) + identifier, table = _create_table( + catalog_options, + "explain_partition_filter", + pa_schema, + partition_keys=["dt"], + options={"bucket": "1", "file.format": "parquet"}, + ) + _write_arrow( + table, + pa.table({"id": [1], "name": ["a"], "dt": ["2024-01-01"]}, schema=pa_schema), + ) + _write_arrow( + table, + pa.table({"id": [2], "name": ["b"], "dt": ["2024-01-02"]}, schema=pa_schema), + ) + + result = explain_paimon_scan( + identifier, + catalog_options, + partition_filters=col("dt") == "2024-01-02", + verbose=True, + ) + + assert result.paimon_scan.split_count == 2 + assert result.native_parquet_split_count == 1 + assert result.pypaimon_fallback_split_count == 0 + assert any("dt" in partition_filter for partition_filter in result.partition_filters) + assert result.splits is not None + assert len(result.splits) == 1 + assert result.splits[0].partition == {"dt": "2024-01-02"} + + +def test_explain_scan_reports_pk_lsm_fallback(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ("dt", pa.string()), + ]) + _, table = _create_table( + catalog_options, + "explain_pk_fallback", + pa_schema, + partition_keys=["dt"], + primary_keys=["id", "dt"], + options={"bucket": "1", "file.format": "parquet"}, + ) + _write_arrow( + table, + pa.table({"id": [1, 2], "name": ["old-a", "old-b"], "dt": ["2024-01-01", "2024-01-01"]}, schema=pa_schema), + ) + _write_arrow( + table, + pa.table({"id": [1], "name": ["new-a"], "dt": ["2024-01-01"]}, schema=pa_schema), + ) + + result = _explain_table( + table, + catalog_options=catalog_options, + filters=col("id") == 1, + columns=["name"], + limit=1, + verbose=True, + ) + + assert result.pypaimon_fallback_split_count > 0 + assert result.native_parquet_split_count == 0 + assert result.fallback_reasons["LSM merge required"] == result.pypaimon_fallback_split_count + assert result.fallback_read_columns is not None + assert "name" in result.fallback_read_columns + assert "id" in result.fallback_read_columns + assert result.splits is not None + assert all(split.reader_mode == READER_MODE_PYPAIMON_FALLBACK for split in result.splits) + assert all(split.fallback_reason == "LSM merge required" for split in result.splits) + + +def test_explain_scan_reports_non_parquet_fallback(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + _, table = _create_table( + catalog_options, + "explain_avro_fallback", + pa_schema, + options={"bucket": "-1", "file.format": "avro"}, + ) + _write_arrow(table, pa.table({"id": [1], "name": ["a"]}, schema=pa_schema)) + + result = _explain_table(table, catalog_options=catalog_options, verbose=True) + + assert result.pypaimon_fallback_split_count == result.paimon_scan.split_count + assert result.pypaimon_fallback_split_count > 0 + assert result.native_parquet_split_count == 0 + assert result.fallback_reasons["non-parquet format"] == result.pypaimon_fallback_split_count + assert result.splits is not None + assert all(split.fallback_reason == "non-parquet format" for split in result.splits) + + +@requires_blob +def test_explain_scan_reports_blob_fallback(catalog_options): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("payload", pa.large_binary()), + ]) + _, table = _create_table( + catalog_options, + "explain_blob_fallback", + pa_schema, + options={ + "bucket": "-1", + "file.format": "parquet", + "row-tracking.enabled": "true", + "data-evolution.enabled": "true", + }, + ) + _write_arrow(table, pa.table({"id": [1], "payload": [b"hello"]}, schema=pa_schema)) + + result = _explain_table(table, catalog_options=catalog_options, verbose=True) + + assert result.pypaimon_fallback_split_count == result.paimon_scan.split_count + assert result.pypaimon_fallback_split_count > 0 + assert result.native_parquet_split_count == 0 + assert result.fallback_reasons["blob columns present"] == result.pypaimon_fallback_split_count + assert result.splits is not None + assert all(split.reader_mode == READER_MODE_PYPAIMON_FALLBACK for split in result.splits) + assert all(split.fallback_reason == "blob columns present" for split in result.splits) + + +def test_explain_scan_reports_deletion_vector_fallback(catalog_options, monkeypatch): + pa_schema = pa.schema([ + ("id", pa.int64()), + ("name", pa.string()), + ]) + _, table = _create_table( + catalog_options, + "explain_deletion_vector_fallback", + pa_schema, + options={"bucket": "-1", "file.format": "parquet"}, + ) + + class FakeReadBuilder: + def explain(self, verbose: bool = False) -> ExplainResult: + assert verbose is True + return _single_split_explain( + table_identifier="test_db.explain_deletion_vector_fallback", + raw_convertible=True, + has_deletion_vectors=True, + ) + + def fake_scan_read_builder(self, table, read_pushdowns): + return FakeReadBuilder() + + monkeypatch.setattr(PaimonDataSource, "_scan_read_builder", fake_scan_read_builder) + + result = _explain_table(table, catalog_options=catalog_options, verbose=True) + + assert result.pypaimon_fallback_split_count == 1 + assert result.native_parquet_split_count == 0 + assert result.fallback_reasons == {"deletion vectors present": 1} + assert result.splits is not None + assert len(result.splits) == 1 + assert result.splits[0].reader_mode == READER_MODE_PYPAIMON_FALLBACK + assert result.splits[0].fallback_reason == "deletion vectors present"