Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 18 additions & 60 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,12 @@
from functools import cached_property
from itertools import chain
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
TypeVar,
)
from typing import TYPE_CHECKING, Any, TypeVar

from pydantic import Field

import pyiceberg.expressions.parser as parser
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
And,
BooleanExpression,
EqualTo,
IsNull,
Or,
Reference,
)
from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, IsNull, Or, Reference
from pyiceberg.expressions.visitors import (
ResidualEvaluator,
_InclusiveMetricsEvaluator,
Expand All @@ -54,36 +41,17 @@
manifest_evaluator,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
DataFile,
DataFileContent,
ManifestContent,
ManifestEntry,
ManifestFile,
)
from pyiceberg.partitioning import (
PARTITION_FIELD_ID_START,
UNPARTITIONED_PARTITION_SPEC,
PartitionKey,
PartitionSpec,
)
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.delete_file_index import DeleteFileIndex
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
from pyiceberg.table.metadata import (
INITIAL_SEQUENCE_NUMBER,
TableMetadata,
)
from pyiceberg.table.name_mapping import (
NameMapping,
)
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
from pyiceberg.table.snapshots import (
Snapshot,
SnapshotLogEntry,
)
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
AddPartitionSpecUpdate,
Expand All @@ -107,11 +75,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import (
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
)
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.sorting import UpdateSortOrder
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
Expand All @@ -126,9 +90,7 @@
Record,
TableVersion,
)
from pyiceberg.types import (
strtobool,
)
from pyiceberg.types import strtobool
from pyiceberg.utils.concurrent import ExecutorFactory
from pyiceberg.utils.config import Config
from pyiceberg.utils.properties import property_as_bool
Expand All @@ -144,11 +106,7 @@
from pyiceberg_core.datafusion import IcebergDataFusionTable

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.rest.scan_planning import (
RESTContentFile,
RESTDeleteFile,
RESTFileScanTask,
)
from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask

ALWAYS_TRUE = AlwaysTrue()
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
Expand Down Expand Up @@ -396,16 +354,20 @@ def _set_ref_snapshot(

return updates, requirements

def _build_partition_predicate(self, partition_records: set[Record]) -> BooleanExpression:
def _build_partition_predicate(
self, partition_records: set[Record], spec: PartitionSpec | None = None, schema: Schema | None = None
) -> BooleanExpression:
"""Build a filter predicate matching any of the input partition records.

Args:
partition_records: A set of partition records to match
spec: An optional partition spec, if none then defaults to current
schema: An optional schema, if none then defaults to current
Returns:
A predicate matching any of the input partition records.
"""
partition_spec = self.table_metadata.spec()
schema = self.table_metadata.schema()
partition_spec = spec or self.table_metadata.spec()
schema = schema or self.table_metadata.schema()
partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]

expr: BooleanExpression = AlwaysFalse()
Expand Down Expand Up @@ -673,11 +635,7 @@ def delete(
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
branch: Branch Reference to run the delete operation
"""
from pyiceberg.io.pyarrow import (
ArrowScan,
_dataframe_to_data_files,
_expression_to_complementary_pyarrow,
)
from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow

if (
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)
Expand Down
Loading