Skip to content

Commit 1241215

Browse files
committed
prune manifest in overwrite operations
1 parent 78615d2 commit 1241215

2 files changed

Lines changed: 99 additions & 83 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,16 +396,18 @@ def _set_ref_snapshot(
396396

397397
return updates, requirements
398398

399-
def _build_partition_predicate(self, partition_records: set[Record]) -> BooleanExpression:
399+
def _build_partition_predicate(
400+
self, partition_records: set[Record], spec: PartitionSpec | None = None, schema: Schema | None = None
401+
) -> BooleanExpression:
400402
"""Build a filter predicate matching any of the input partition records.
401403
402404
Args:
403405
partition_records: A set of partition records to match
404406
Returns:
405407
A predicate matching any of the input partition records.
406408
"""
407-
partition_spec = self.table_metadata.spec()
408-
schema = self.table_metadata.schema()
409+
partition_spec = spec or self.table_metadata.spec()
410+
schema = schema or self.table_metadata.schema()
409411
partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]
410412

411413
expr: BooleanExpression = AlwaysFalse()

pyiceberg/table/update/snapshot.py

Lines changed: 94 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,7 @@
2626
from typing import TYPE_CHECKING, Generic
2727

2828
from pyiceberg.avro.codecs import AvroCompressionCodec
29-
from pyiceberg.expressions import (
30-
AlwaysFalse,
31-
BooleanExpression,
32-
Or,
33-
)
29+
from pyiceberg.expressions import AlwaysFalse, BooleanExpression, Or
3430
from pyiceberg.expressions.visitors import (
3531
ROWS_MIGHT_NOT_MATCH,
3632
ROWS_MUST_MATCH,
@@ -51,9 +47,8 @@
5147
write_manifest,
5248
write_manifest_list,
5349
)
54-
from pyiceberg.partitioning import (
55-
PartitionSpec,
56-
)
50+
from pyiceberg.partitioning import PartitionSpec
51+
from pyiceberg.schema import Schema
5752
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRefType
5853
from pyiceberg.table.snapshots import (
5954
Operation,
@@ -76,10 +71,7 @@
7671
UpdatesAndRequirements,
7772
UpdateTableMetadata,
7873
)
79-
from pyiceberg.typedef import (
80-
EMPTY_DICT,
81-
KeyDefaultDict,
82-
)
74+
from pyiceberg.typedef import EMPTY_DICT, KeyDefaultDict, Record
8375
from pyiceberg.utils.bin_packing import ListPacker
8476
from pyiceberg.utils.concurrent import ExecutorFactory
8577
from pyiceberg.utils.datetime import datetime_to_millis
@@ -110,6 +102,8 @@ class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
110102
_deleted_data_files: set[DataFile]
111103
_compression: AvroCompressionCodec
112104
_target_branch: str | None
105+
_predicate: BooleanExpression
106+
_case_sensitive: bool
113107

114108
def __init__(
115109
self,
@@ -138,6 +132,8 @@ def __init__(
138132
self._parent_snapshot_id = (
139133
snapshot.snapshot_id if (snapshot := self._transaction.table_metadata.snapshot_by_name(self._target_branch)) else None
140134
)
135+
self._predicate = AlwaysFalse()
136+
self._case_sensitive = True
141137

142138
def _validate_target_branch(self, branch: str | None) -> str | None:
143139
# if branch is none, write will be written into a staging snapshot
@@ -185,7 +181,7 @@ def _write_added_manifest() -> list[ManifestFile]:
185181
with write_manifest(
186182
format_version=self._transaction.table_metadata.format_version,
187183
spec=self._transaction.table_metadata.spec(),
188-
schema=self._transaction.table_metadata.schema(),
184+
schema=self.schema(),
189185
output_file=self.new_manifest_output(),
190186
snapshot_id=self._snapshot_id,
191187
avro_compression=self._compression,
@@ -216,7 +212,7 @@ def _write_delete_manifest() -> list[ManifestFile]:
216212
with write_manifest(
217213
format_version=self._transaction.table_metadata.format_version,
218214
spec=self._transaction.table_metadata.specs()[spec_id],
219-
schema=self._transaction.table_metadata.schema(),
215+
schema=self.schema(),
220216
output_file=self.new_manifest_output(),
221217
snapshot_id=self._snapshot_id,
222218
avro_compression=self._compression,
@@ -344,14 +340,20 @@ def _commit(self) -> UpdatesAndRequirements:
344340
def snapshot_id(self) -> int:
345341
return self._snapshot_id
346342

343+
def schema(self, schema_id: int | None = None) -> Schema:
344+
if schema_id:
345+
if schema := self._transaction.table_metadata.schema_by_id(schema_id):
346+
return schema
347+
return self._transaction.table_metadata.schema()
348+
347349
def spec(self, spec_id: int) -> PartitionSpec:
348350
return self._transaction.table_metadata.specs()[spec_id]
349351

350352
def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
351353
return write_manifest(
352354
format_version=self._transaction.table_metadata.format_version,
353355
spec=spec,
354-
schema=self._transaction.table_metadata.schema(),
356+
schema=self.schema(),
355357
output_file=self.new_manifest_output(),
356358
snapshot_id=self._snapshot_id,
357359
avro_compression=self._compression,
@@ -366,6 +368,23 @@ def new_manifest_output(self) -> OutputFile:
366368
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> list[ManifestEntry]:
367369
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)
368370

371+
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
372+
spec = self._transaction.table_metadata.specs()[spec_id]
373+
project = inclusive_projection(self.schema(), spec, self._case_sensitive)
374+
return project(self._predicate)
375+
376+
@cached_property
377+
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
378+
return KeyDefaultDict(self._build_partition_projection)
379+
380+
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
381+
spec = self._transaction.table_metadata.specs()[spec_id]
382+
return manifest_evaluator(spec, self.schema(), self.partition_filters[spec_id], self._case_sensitive)
383+
384+
def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None:
385+
self._predicate = Or(self._predicate, predicate)
386+
self._case_sensitive = case_sensitive
387+
369388

370389
class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
371390
"""Will delete manifest entries from the current snapshot based on the predicate.
@@ -377,48 +396,13 @@ class _DeleteFiles(_SnapshotProducer["_DeleteFiles"]):
377396
From the specification
378397
"""
379398

380-
_predicate: BooleanExpression
381-
_case_sensitive: bool
382-
383-
def __init__(
384-
self,
385-
operation: Operation,
386-
transaction: Transaction,
387-
io: FileIO,
388-
branch: str | None = MAIN_BRANCH,
389-
commit_uuid: uuid.UUID | None = None,
390-
snapshot_properties: dict[str, str] = EMPTY_DICT,
391-
):
392-
super().__init__(operation, transaction, io, commit_uuid, snapshot_properties, branch)
393-
self._predicate = AlwaysFalse()
394-
self._case_sensitive = True
395-
396399
def _commit(self) -> UpdatesAndRequirements:
397400
# Only produce a commit when there is something to delete
398401
if self.files_affected:
399402
return super()._commit()
400403
else:
401404
return (), ()
402405

403-
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
404-
schema = self._transaction.table_metadata.schema()
405-
spec = self._transaction.table_metadata.specs()[spec_id]
406-
project = inclusive_projection(schema, spec, self._case_sensitive)
407-
return project(self._predicate)
408-
409-
@cached_property
410-
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
411-
return KeyDefaultDict(self._build_partition_projection)
412-
413-
def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
414-
schema = self._transaction.table_metadata.schema()
415-
spec = self._transaction.table_metadata.specs()[spec_id]
416-
return manifest_evaluator(spec, schema, self.partition_filters[spec_id], self._case_sensitive)
417-
418-
def delete_by_predicate(self, predicate: BooleanExpression, case_sensitive: bool = True) -> None:
419-
self._predicate = Or(self._predicate, predicate)
420-
self._case_sensitive = case_sensitive
421-
422406
@cached_property
423407
def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], bool]:
424408
"""Computes all the delete operation and cache it when nothing changes.
@@ -428,7 +412,6 @@ def _compute_deletes(self) -> tuple[list[ManifestFile], list[ManifestEntry], boo
428412
- The manifest-entries that are deleted based on the metadata.
429413
- Flag indicating that rewrites of data-files are needed.
430414
"""
431-
schema = self._transaction.table_metadata.schema()
432415

433416
def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry:
434417
return ManifestEntry.from_args(
@@ -442,9 +425,11 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
442425
)
443426

444427
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
445-
strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=self._case_sensitive).eval
428+
strict_metrics_evaluator = _StrictMetricsEvaluator(
429+
self.schema(), self._predicate, case_sensitive=self._case_sensitive
430+
).eval
446431
inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(
447-
schema, self._predicate, case_sensitive=self._case_sensitive
432+
self.schema(), self._predicate, case_sensitive=self._case_sensitive
448433
).eval
449434

450435
existing_manifests = []
@@ -486,7 +471,7 @@ def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) ->
486471
with write_manifest(
487472
format_version=self._transaction.table_metadata.format_version,
488473
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
489-
schema=self._transaction.table_metadata.schema(),
474+
schema=self.schema(),
490475
output_file=self.new_manifest_output(),
491476
snapshot_id=self._snapshot_id,
492477
avro_compression=self._compression,
@@ -609,36 +594,65 @@ def _existing_manifests(self) -> list[ManifestFile]:
609594
"""Determine if there are any existing manifest files."""
610595
existing_files = []
611596

597+
partition_to_overwrite: dict[int, set[Record]] = {}
598+
for data_file in self._deleted_data_files:
599+
group = partition_to_overwrite.setdefault(data_file.spec_id, set())
600+
group.add(data_file.partition)
601+
602+
for spec_id, data_files in partition_to_overwrite.items():
603+
self.delete_by_predicate(
604+
self._transaction._build_partition_predicate(
605+
partition_records=data_files, schema=self.schema(), spec=self.spec(spec_id)
606+
)
607+
)
608+
609+
manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
612610
if snapshot := self._transaction.table_metadata.snapshot_by_name(name=self._target_branch):
613611
for manifest_file in snapshot.manifests(io=self._io):
614-
entries = manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True)
615-
found_deleted_data_files = [entry.data_file for entry in entries if entry.data_file in self._deleted_data_files]
612+
# Manifest does not contain rows that match the files to delete partitions
613+
if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
614+
existing_files.append(manifest_file)
615+
continue
616616

617-
if len(found_deleted_data_files) == 0:
617+
entries_to_write: set[ManifestEntry] = set()
618+
found_deleted_entries: set[ManifestEntry] = set()
619+
620+
for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
621+
if entry.data_file in self._deleted_data_files:
622+
found_deleted_entries.add(entry)
623+
else:
624+
entries_to_write.add(entry)
625+
626+
# Is the intercept the empty set?
627+
if len(found_deleted_entries) == 0:
618628
existing_files.append(manifest_file)
619-
else:
620-
# We have to rewrite the manifest file without the deleted data files
621-
if any(entry.data_file not in found_deleted_data_files for entry in entries):
622-
with write_manifest(
623-
format_version=self._transaction.table_metadata.format_version,
624-
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
625-
schema=self._transaction.table_metadata.schema(),
626-
output_file=self.new_manifest_output(),
627-
snapshot_id=self._snapshot_id,
628-
avro_compression=self._compression,
629-
) as writer:
630-
for entry in entries:
631-
if entry.data_file not in found_deleted_data_files:
632-
writer.add_entry(
633-
ManifestEntry.from_args(
634-
status=ManifestEntryStatus.EXISTING,
635-
snapshot_id=entry.snapshot_id,
636-
sequence_number=entry.sequence_number,
637-
file_sequence_number=entry.file_sequence_number,
638-
data_file=entry.data_file,
639-
)
640-
)
641-
existing_files.append(writer.to_manifest_file())
629+
continue
630+
631+
# Delete all files from manifest
632+
if len(entries_to_write) == 0:
633+
continue
634+
635+
# We have to rewrite the manifest file without the deleted data files
636+
with write_manifest(
637+
format_version=self._transaction.table_metadata.format_version,
638+
spec=self.spec(manifest_file.partition_spec_id),
639+
schema=self.schema(),
640+
output_file=self.new_manifest_output(),
641+
snapshot_id=self._snapshot_id,
642+
avro_compression=self._compression,
643+
) as writer:
644+
for entry in entries_to_write:
645+
writer.add_entry(
646+
ManifestEntry.from_args(
647+
status=ManifestEntryStatus.EXISTING,
648+
snapshot_id=entry.snapshot_id,
649+
sequence_number=entry.sequence_number,
650+
file_sequence_number=entry.file_sequence_number,
651+
data_file=entry.data_file,
652+
)
653+
)
654+
existing_files.append(writer.to_manifest_file())
655+
642656
return existing_files
643657

644658
def _deleted_entries(self) -> list[ManifestEntry]:

0 commit comments

Comments
 (0)