Skip to content

Commit 3e35bfd

Browse files
committed
feat: Add support for rest scan planning
1 parent 663be50 commit 3e35bfd

File tree

7 files changed

+690
-34
lines changed

7 files changed

+690
-34
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,13 @@ def close(self) -> None: # noqa: B027
794794
Default implementation does nothing. Override in subclasses that need cleanup.
795795
"""
796796

797+
def is_rest_scan_planning_enabled(self) -> bool:
798+
"""Check if server-side scan planning is enabled.
799+
800+
Returns False by default.
801+
"""
802+
return False
803+
797804
def __enter__(self) -> Catalog:
798805
"""Enter the context manager.
799806

pyiceberg/catalog/rest/__init__.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from collections import deque
18+
from collections.abc import Iterator
1719
from enum import Enum
1820
from typing import (
1921
TYPE_CHECKING,
2022
Any,
2123
Union,
2224
)
2325

24-
from pydantic import ConfigDict, Field, field_validator
26+
from pydantic import ConfigDict, Field, TypeAdapter, field_validator
2527
from requests import HTTPError, Session
2628
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
2729

@@ -36,6 +38,16 @@
3638
)
3739
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
3840
from pyiceberg.catalog.rest.response import _handle_non_200_response
41+
from pyiceberg.catalog.rest.scan_planning import (
42+
FetchScanTasksRequest,
43+
PlanCancelled,
44+
PlanCompleted,
45+
PlanFailed,
46+
PlanningResponse,
47+
PlanSubmitted,
48+
PlanTableScanRequest,
49+
ScanTasks,
50+
)
3951
from pyiceberg.exceptions import (
4052
AuthorizationExpiredError,
4153
CommitFailedException,
@@ -44,6 +56,7 @@
4456
NamespaceNotEmptyError,
4557
NoSuchIdentifierError,
4658
NoSuchNamespaceError,
59+
NoSuchPlanTaskError,
4760
NoSuchTableError,
4861
NoSuchViewError,
4962
TableAlreadyExistsError,
@@ -56,6 +69,7 @@
5669
CommitTableRequest,
5770
CommitTableResponse,
5871
CreateTableTransaction,
72+
FileScanTask,
5973
StagedTable,
6074
Table,
6175
TableIdentifier,
@@ -322,6 +336,9 @@ class ListViewsResponse(IcebergBaseModel):
322336
identifiers: list[ListViewResponseEntry] = Field()
323337

324338

339+
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)
340+
341+
325342
class RestCatalog(Catalog):
326343
uri: str
327344
_session: Session
@@ -391,6 +408,103 @@ def is_rest_scan_planning_enabled(self) -> bool:
391408
self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT
392409
)
393410

411+
@retry(**_RETRY_ARGS)
412+
def _plan_table_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> PlanningResponse:
413+
"""Submit a scan plan request to the REST server.
414+
415+
Args:
416+
identifier: Table identifier.
417+
request: The scan plan request parameters.
418+
419+
Returns:
420+
PlanningResponse the result of the scan plan request representing the status
421+
Raises:
422+
NoSuchTableError: If a table with the given identifier does not exist.
423+
"""
424+
self._check_endpoint(Capability.V1_SUBMIT_TABLE_SCAN_PLAN)
425+
response = self._session.post(
426+
self.url(Endpoints.plan_table_scan, prefixed=True, **self._split_identifier_for_path(identifier)),
427+
json=request.model_dump(by_alias=True, exclude_none=True),
428+
)
429+
try:
430+
response.raise_for_status()
431+
except HTTPError as exc:
432+
_handle_non_200_response(exc, {404: NoSuchTableError})
433+
434+
return _PLANNING_RESPONSE_ADAPTER.validate_json(response.text)
435+
436+
@retry(**_RETRY_ARGS)
437+
def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> ScanTasks:
438+
"""Fetch additional scan tasks using a plan task token.
439+
440+
Args:
441+
identifier: Table identifier.
442+
plan_task: The plan task token from a previous response.
443+
444+
Returns:
445+
ScanTasks containing file scan tasks and possibly more plan-task tokens.
446+
447+
Raises:
448+
NoSuchPlanTaskError: If a plan task with the given identifier or task does not exist.
449+
"""
450+
self._check_endpoint(Capability.V1_TABLE_SCAN_PLAN_TASKS)
451+
request = FetchScanTasksRequest(plan_task=plan_task)
452+
response = self._session.post(
453+
self.url(Endpoints.fetch_scan_tasks, prefixed=True, **self._split_identifier_for_path(identifier)),
454+
json=request.model_dump(by_alias=True),
455+
)
456+
try:
457+
response.raise_for_status()
458+
except HTTPError as exc:
459+
_handle_non_200_response(exc, {404: NoSuchPlanTaskError})
460+
461+
return ScanTasks.model_validate_json(response.text)
462+
463+
def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> Iterator["FileScanTask"]:
464+
"""Plan a table scan and yield FileScanTasks.
465+
466+
Handles the full scan planning lifecycle including pagination.
467+
Each response batch is self-contained, so tasks are yielded as received.
468+
469+
Args:
470+
identifier: Table identifier.
471+
request: The scan plan request parameters.
472+
473+
Yields:
474+
FileScanTask objects ready for execution.
475+
476+
Raises:
477+
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
478+
NotImplementedError: If async planning is required but not yet supported.
479+
"""
480+
response = self._plan_table_scan(identifier, request)
481+
482+
if isinstance(response, PlanFailed):
483+
raise RuntimeError(f"Received status: failed: {response.error.message}")
484+
485+
if isinstance(response, PlanCancelled):
486+
raise RuntimeError("Received status: cancelled")
487+
488+
if isinstance(response, PlanSubmitted):
489+
# TODO: implement polling for async planning
490+
raise NotImplementedError(f"Async scan planning not yet supported for planId: {response.plan_id}")
491+
492+
if not isinstance(response, PlanCompleted):
493+
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")
494+
495+
# Yield tasks from initial response
496+
for task in response.file_scan_tasks:
497+
yield FileScanTask.from_rest_response(task, response.delete_files)
498+
499+
# Fetch and yield from additional batches
500+
pending_tasks = deque(response.plan_tasks)
501+
while pending_tasks:
502+
plan_task = pending_tasks.popleft()
503+
batch = self._fetch_scan_tasks(identifier, plan_task)
504+
for task in batch.file_scan_tasks:
505+
yield FileScanTask.from_rest_response(task, batch.delete_files)
506+
pending_tasks.extend(batch.plan_tasks)
507+
394508
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
395509
"""Create the LegacyOAuth2AuthManager by fetching required properties.
396510

pyiceberg/catalog/rest/scan_planning.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@
2525

2626
from pyiceberg.catalog.rest.response import ErrorResponseMessage
2727
from pyiceberg.expressions import BooleanExpression
28-
from pyiceberg.manifest import FileFormat
28+
from pyiceberg.manifest import DataFileContent, FileFormat
2929
from pyiceberg.typedef import IcebergBaseModel
3030

31+
CONTENT_TYPE_MAP: dict[str, DataFileContent] = {
32+
"data": DataFileContent.DATA,
33+
"position-deletes": DataFileContent.POSITION_DELETES,
34+
"equality-deletes": DataFileContent.EQUALITY_DELETES,
35+
}
36+
3137
# Primitive types that can appear in partition values and bounds
3238
PrimitiveTypeValue: TypeAlias = bool | int | float | str | Decimal | UUID | date | time | datetime | bytes
3339

pyiceberg/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ class NoSuchNamespaceError(Exception):
5252
"""Raised when a referenced name-space is not found."""
5353

5454

55+
class NoSuchPlanError(Exception):
56+
"""Raised when a scan plan ID is not found."""
57+
58+
59+
class NoSuchPlanTaskError(Exception):
60+
"""Raised when a scan plan task is not found."""
61+
62+
5563
class RESTError(Exception):
5664
"""Raises when there is an unknown response from the REST Catalog."""
5765

pyiceberg/table/__init__.py

Lines changed: 108 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@
145145
from pyiceberg_core.datafusion import IcebergDataFusionTable
146146

147147
from pyiceberg.catalog import Catalog
148+
from pyiceberg.catalog.rest.scan_planning import (
149+
RESTContentFile,
150+
RESTDeleteFile,
151+
RESTFileScanTask,
152+
)
148153

149154
ALWAYS_TRUE = AlwaysTrue()
150155
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -1168,6 +1173,8 @@ def scan(
11681173
snapshot_id=snapshot_id,
11691174
options=options,
11701175
limit=limit,
1176+
catalog=self.catalog,
1177+
table_identifier=self._identifier,
11711178
)
11721179

11731180
@property
@@ -1684,6 +1691,8 @@ class TableScan(ABC):
16841691
snapshot_id: int | None
16851692
options: Properties
16861693
limit: int | None
1694+
catalog: Catalog | None
1695+
table_identifier: Identifier | None
16871696

16881697
def __init__(
16891698
self,
@@ -1695,6 +1704,8 @@ def __init__(
16951704
snapshot_id: int | None = None,
16961705
options: Properties = EMPTY_DICT,
16971706
limit: int | None = None,
1707+
catalog: Catalog | None = None,
1708+
table_identifier: Identifier | None = None,
16981709
):
16991710
self.table_metadata = table_metadata
17001711
self.io = io
@@ -1704,6 +1715,8 @@ def __init__(
17041715
self.snapshot_id = snapshot_id
17051716
self.options = options
17061717
self.limit = limit
1718+
self.catalog = catalog
1719+
self.table_identifier = table_identifier
17071720

17081721
def snapshot(self) -> Snapshot | None:
17091722
if self.snapshot_id:
@@ -1798,6 +1811,67 @@ def __init__(
17981811
self.delete_files = delete_files or set()
17991812
self.residual = residual
18001813

1814+
@staticmethod
1815+
def from_rest_response(
1816+
rest_task: RESTFileScanTask,
1817+
delete_files: list[RESTDeleteFile],
1818+
) -> FileScanTask:
1819+
"""Convert a RESTFileScanTask to a FileScanTask.
1820+
1821+
Args:
1822+
rest_task: The REST file scan task.
1823+
delete_files: The list of delete files from the ScanTasks response.
1824+
1825+
Returns:
1826+
A FileScanTask with the converted data and delete files.
1827+
1828+
Raises:
1829+
NotImplementedError: If equality delete files are encountered.
1830+
"""
1831+
from pyiceberg.catalog.rest.scan_planning import RESTEqualityDeleteFile
1832+
1833+
data_file = _rest_file_to_data_file(rest_task.data_file, include_stats=True)
1834+
1835+
resolved_deletes: set[DataFile] = set()
1836+
if rest_task.delete_file_references:
1837+
for idx in rest_task.delete_file_references:
1838+
delete_file = delete_files[idx]
1839+
if isinstance(delete_file, RESTEqualityDeleteFile):
1840+
raise NotImplementedError(f"PyIceberg does not yet support equality deletes: {delete_file.file_path}")
1841+
resolved_deletes.add(_rest_file_to_data_file(delete_file, include_stats=False))
1842+
1843+
return FileScanTask(
1844+
data_file=data_file,
1845+
delete_files=resolved_deletes,
1846+
residual=rest_task.residual_filter if rest_task.residual_filter else ALWAYS_TRUE,
1847+
)
1848+
1849+
1850+
def _rest_file_to_data_file(rest_file: RESTContentFile, *, include_stats: bool) -> DataFile:
1851+
"""Convert a REST content file to a manifest DataFile."""
1852+
from pyiceberg.catalog.rest.scan_planning import CONTENT_TYPE_MAP
1853+
1854+
column_sizes = getattr(rest_file, "column_sizes", None)
1855+
value_counts = getattr(rest_file, "value_counts", None)
1856+
null_value_counts = getattr(rest_file, "null_value_counts", None)
1857+
nan_value_counts = getattr(rest_file, "nan_value_counts", None)
1858+
1859+
return DataFile.from_args(
1860+
content=CONTENT_TYPE_MAP[rest_file.content],
1861+
file_path=rest_file.file_path,
1862+
file_format=rest_file.file_format,
1863+
partition=Record(*rest_file.partition),
1864+
record_count=rest_file.record_count,
1865+
file_size_in_bytes=rest_file.file_size_in_bytes,
1866+
column_sizes=column_sizes.to_dict() if include_stats and column_sizes else None,
1867+
value_counts=value_counts.to_dict() if include_stats and value_counts else None,
1868+
null_value_counts=null_value_counts.to_dict() if include_stats and null_value_counts else None,
1869+
nan_value_counts=nan_value_counts.to_dict() if include_stats and nan_value_counts else None,
1870+
split_offsets=rest_file.split_offsets,
1871+
sort_order_id=rest_file.sort_order_id,
1872+
spec_id=rest_file.spec_id,
1873+
)
1874+
18011875

18021876
def _open_manifest(
18031877
io: FileIO,
@@ -1970,12 +2044,27 @@ def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]:
19702044
],
19712045
)
19722046

1973-
def plan_files(self) -> Iterable[FileScanTask]:
1974-
"""Plans the relevant files by filtering on the PartitionSpecs.
2047+
def _should_use_rest_planning(self) -> bool:
2048+
"""Check if REST scan planning should be used for this scan."""
2049+
if self.catalog is None:
2050+
return False
2051+
return self.catalog.is_rest_scan_planning_enabled()
2052+
2053+
def _plan_files_rest(self) -> Iterable[FileScanTask]:
2054+
"""Plan files using REST server-side scan planning."""
2055+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2056+
2057+
request = PlanTableScanRequest(
2058+
snapshot_id=self.snapshot_id,
2059+
select=list(self.selected_fields) if self.selected_fields != ("*",) else None,
2060+
filter=self.row_filter if self.row_filter != ALWAYS_TRUE else None,
2061+
case_sensitive=self.case_sensitive,
2062+
)
19752063

1976-
Returns:
1977-
List of FileScanTasks that contain both data and delete files.
1978-
"""
2064+
return self.catalog.plan_scan(self.table_identifier, request) # type: ignore[union-attr]
2065+
2066+
def _plan_files_local(self) -> Iterable[FileScanTask]:
2067+
"""Plan files locally by reading manifests."""
19792068
data_entries: list[ManifestEntry] = []
19802069
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
19812070

@@ -2006,6 +2095,20 @@ def plan_files(self) -> Iterable[FileScanTask]:
20062095
for data_entry in data_entries
20072096
]
20082097

2098+
def plan_files(self) -> Iterable[FileScanTask]:
2099+
"""Plans the relevant files by filtering on the PartitionSpecs.
2100+
2101+
If the table comes from a REST catalog with scan planning enabled,
2102+
this will use server-side scan planning. Otherwise, it falls back
2103+
to local planning by reading manifests.
2104+
2105+
Returns:
2106+
List of FileScanTasks that contain both data and delete files.
2107+
"""
2108+
if self._should_use_rest_planning():
2109+
return self._plan_files_rest()
2110+
return self._plan_files_local()
2111+
20092112
def to_arrow(self) -> pa.Table:
20102113
"""Read an Arrow table eagerly from this DataScan.
20112114

0 commit comments

Comments
 (0)