Skip to content

Commit f803333

Browse files
committed
feat: Add configurable commit retry logic for snapshot and transaction operations
1 parent 8ed913b commit f803333

7 files changed

Lines changed: 1710 additions & 18 deletions

File tree

pyiceberg/catalog/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@
6767
TableVersion,
6868
)
6969
from pyiceberg.utils.config import Config, merge_config
70-
from pyiceberg.utils.properties import property_as_bool
70+
from pyiceberg.utils.properties import property_as_bool, property_as_int
71+
from pyiceberg.utils.retry import RetryConfig, run_with_suppressed_failure
7172

7273
if TYPE_CHECKING:
7374
import pyarrow as pa

pyiceberg/table/__init__.py

Lines changed: 179 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from pydantic import Field
3636
from sortedcontainers import SortedList
3737

38+
from pyiceberg.exceptions import CommitFailedException
3839
import pyiceberg.expressions.parser as parser
3940
from pyiceberg.expressions import (
4041
AlwaysFalse,
@@ -104,6 +105,7 @@
104105
TableRequirement,
105106
TableUpdate,
106107
UpdatesAndRequirements,
108+
UpdateTableMetadata,
107109
UpgradeFormatVersionUpdate,
108110
update_table_metadata,
109111
)
@@ -132,7 +134,8 @@
132134
)
133135
from pyiceberg.utils.concurrent import ExecutorFactory
134136
from pyiceberg.utils.config import Config
135-
from pyiceberg.utils.properties import property_as_bool
137+
from pyiceberg.utils.properties import property_as_bool, property_as_int
138+
from pyiceberg.utils.retry import RetryConfig, run_with_retry
136139

137140
if TYPE_CHECKING:
138141
import bodo.pandas as bd
@@ -243,12 +246,55 @@ class TableProperties:
243246
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
244247
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1
245248

249+
COMMIT_NUM_RETRIES = "commit.retry.num-retries"
250+
COMMIT_NUM_RETRIES_DEFAULT = 4
251+
252+
COMMIT_MIN_RETRY_WAIT_MS = "commit.retry.min-wait-ms"
253+
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT = 100
254+
255+
COMMIT_MAX_RETRY_WAIT_MS = "commit.retry.max-wait-ms"
256+
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute
257+
258+
COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"
259+
COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes
260+
261+
COMMIT_NUM_STATUS_CHECKS = "commit.status-check.num-retries"
262+
COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3
263+
264+
COMMIT_STATUS_CHECKS_MIN_WAIT_MS = "commit.status-check.min-wait-ms"
265+
COMMIT_STATUS_CHECKS_MIN_WAIT_MS_DEFAULT = 1000 # 1 second
266+
267+
COMMIT_STATUS_CHECKS_MAX_WAIT_MS = "commit.status-check.max-wait-ms"
268+
COMMIT_STATUS_CHECKS_MAX_WAIT_MS_DEFAULT = 60 * 1000 # 1 minute
269+
270+
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS = "commit.status-check.total-timeout-ms"
271+
COMMIT_STATUS_CHECKS_TOTAL_WAIT_MS_DEFAULT = 30 * 60 * 1000 # 30 minutes
272+
273+
274+
class _StaticUpdate:
275+
"""Wrapper for simple TableUpdates to make them retryable.
276+
277+
This class wraps TableUpdates that don't need regeneration on retry
278+
(like SetPropertiesUpdate, RemovePropertiesUpdate, UpgradeFormatVersionUpdate).
279+
"""
280+
281+
def __init__(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()):
282+
self._updates = updates
283+
self._requirements = requirements
284+
285+
def _commit(self) -> UpdatesAndRequirements:
286+
"""Return the stored updates and requirements."""
287+
return self._updates, self._requirements
288+
246289

247290
class Transaction:
248291
_table: Table
249292
_autocommit: bool
250293
_updates: tuple[TableUpdate, ...]
251294
_requirements: tuple[TableRequirement, ...]
295+
_pending_updates: list[_StaticUpdate | UpdateTableMetadata]
296+
# NOTE: Whenever _updates is modified, _working_metadata must be updated via update_table_metadata()
297+
_working_metadata: TableMetadata
252298

253299
def __init__(self, table: Table, autocommit: bool = False):
254300
"""Open a transaction to stage and commit changes to a table.
@@ -261,10 +307,13 @@ def __init__(self, table: Table, autocommit: bool = False):
261307
self._autocommit = autocommit
262308
self._updates = ()
263309
self._requirements = ()
310+
self._pending_updates = []
311+
self._working_metadata = table.metadata
264312

265313
@property
266314
def table_metadata(self) -> TableMetadata:
267-
return update_table_metadata(self._table.metadata, self._updates)
315+
"""Return the current working metadata with all updates applied."""
316+
return self._working_metadata
268317

269318
def __enter__(self) -> Transaction:
270319
"""Start a transaction to update the table."""
@@ -275,19 +324,31 @@ def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException |
275324
if exctype is None and excinst is None and exctb is None:
276325
self.commit_transaction()
277326

278-
def _apply(self, updates: tuple[TableUpdate, ...], requirements: tuple[TableRequirement, ...] = ()) -> Transaction:
327+
def _apply(
328+
self,
329+
updates: tuple[TableUpdate, ...],
330+
requirements: tuple[TableRequirement, ...] = (),
331+
pending_update: _StaticUpdate | UpdateTableMetadata | None = None,
332+
) -> Transaction:
279333
"""Check if the requirements are met, and applies the updates to the metadata."""
280334
for requirement in requirements:
281335
requirement.validate(self.table_metadata)
282336

283337
self._updates += updates
284338

339+
self._working_metadata = update_table_metadata(self._working_metadata, updates)
340+
285341
# For the requirements, it does not make sense to add a requirement more than once
286342
# For example, you cannot assert that the current schema has two different IDs
287-
existing_requirements = {type(requirement) for requirement in self._requirements}
343+
existing_requirements = {req.key() for req in self._requirements}
288344
for new_requirement in requirements:
289-
if type(new_requirement) not in existing_requirements:
290-
self._requirements = self._requirements + (new_requirement,)
345+
key = new_requirement.key()
346+
if key not in existing_requirements:
347+
self._requirements += (new_requirement,)
348+
existing_requirements.add(key)
349+
350+
if pending_update is not None:
351+
self._pending_updates.append(pending_update)
291352

292353
if self._autocommit:
293354
self.commit_transaction()
@@ -316,7 +377,8 @@ def upgrade_table_version(self, format_version: TableVersion) -> Transaction:
316377
raise ValueError(f"Cannot downgrade v{self.table_metadata.format_version} table to v{format_version}")
317378

318379
if format_version > self.table_metadata.format_version:
319-
return self._apply((UpgradeFormatVersionUpdate(format_version=format_version),))
380+
updates = (UpgradeFormatVersionUpdate(format_version=format_version),)
381+
return self._apply(updates, pending_update=_StaticUpdate(updates))
320382

321383
return self
322384

@@ -334,8 +396,9 @@ def set_properties(self, properties: Properties = EMPTY_DICT, **kwargs: Any) ->
334396
"""
335397
if properties and kwargs:
336398
raise ValueError("Cannot pass both properties and kwargs")
337-
updates = properties or kwargs
338-
return self._apply((SetPropertiesUpdate(updates=updates),))
399+
props = properties or kwargs
400+
updates = (SetPropertiesUpdate(updates=props),)
401+
return self._apply(updates, pending_update=_StaticUpdate(updates))
339402

340403
def _set_ref_snapshot(
341404
self,
@@ -928,7 +991,8 @@ def remove_properties(self, *removals: str) -> Transaction:
928991
Returns:
929992
The alter table builder.
930993
"""
931-
return self._apply((RemovePropertiesUpdate(removals=removals),))
994+
updates = (RemovePropertiesUpdate(removals=removals),)
995+
return self._apply(updates, pending_update=_StaticUpdate(updates))
932996

933997
def update_location(self, location: str) -> Transaction:
934998
"""Set the new table location.
@@ -949,15 +1013,117 @@ def commit_transaction(self) -> Table:
9491013
"""
9501014
if len(self._updates) > 0:
9511015
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
1016+
1017+
if self._pending_updates:
1018+
self._commit_with_retry()
1019+
else:
1020+
self._table._do_commit( # pylint: disable=W0212
1021+
updates=self._updates,
1022+
requirements=self._requirements,
1023+
)
1024+
1025+
self._updates = ()
1026+
self._requirements = ()
1027+
self._pending_updates = []
1028+
1029+
return self._table
1030+
1031+
def _commit_with_retry(self) -> None:
1032+
"""Commit transaction with retry logic for snapshot operations.
1033+
1034+
On retry, refreshes table metadata and regenerates snapshots from
1035+
the pending snapshot producers.
1036+
"""
1037+
1038+
properties = self._table.metadata.properties
1039+
1040+
retry_config = RetryConfig(
1041+
max_attempts=property_as_int(
1042+
properties,
1043+
TableProperties.COMMIT_NUM_RETRIES,
1044+
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
1045+
)
1046+
or TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
1047+
min_wait_ms=property_as_int(
1048+
properties,
1049+
TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
1050+
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
1051+
)
1052+
or TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
1053+
max_wait_ms=property_as_int(
1054+
properties,
1055+
TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
1056+
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
1057+
)
1058+
or TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
1059+
total_timeout_ms=property_as_int(
1060+
properties,
1061+
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS,
1062+
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
1063+
)
1064+
or TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
1065+
)
1066+
1067+
first_attempt = True
1068+
1069+
def do_commit() -> None:
1070+
nonlocal first_attempt
1071+
if first_attempt:
1072+
first_attempt = False
1073+
else:
1074+
# On retry, reapply all updates with refreshed metadata
1075+
self._reapply_updates()
1076+
9521077
self._table._do_commit( # pylint: disable=W0212
9531078
updates=self._updates,
9541079
requirements=self._requirements,
9551080
)
9561081

1082+
run_with_retry(
1083+
task=do_commit,
1084+
config=retry_config,
1085+
retry_on=(CommitFailedException,),
1086+
)
1087+
1088+
def _reapply_updates(self) -> None:
1089+
"""Reapply all updates after refreshing table metadata.
1090+
1091+
This is called on retry to regenerate all pending updates
1092+
based on the latest table metadata, similar to Java's BaseTransaction.applyUpdates().
1093+
1094+
All updates are rebuilt from _pending_updates to ensure consistency.
1095+
This includes:
1096+
- Static updates (properties, format version) via _StaticUpdate
1097+
- Snapshot operations via snapshot producers with _reset_state()
1098+
1099+
NOTE: Every operation that should survive retry must be tracked in _pending_updates.
1100+
Simple operations use _StaticUpdate wrapper, complex operations (like snapshot
1101+
producers) implement _reset_state() and _commit() directly.
1102+
"""
1103+
self._table.refresh()
1104+
9571105
self._updates = ()
9581106
self._requirements = ()
1107+
self._working_metadata = self._table.metadata
9591108

960-
return self._table
1109+
for pending_update in self._pending_updates:
1110+
# NOTE: When adding new cached properties to snapshot producers,
1111+
# ensure they are cleared in _reset_state() to avoid stale data on retry
1112+
if hasattr(pending_update, "_reset_state"):
1113+
pending_update._reset_state()
1114+
1115+
updates, requirements = pending_update._commit()
1116+
self._updates += updates
1117+
self._working_metadata = update_table_metadata(self._working_metadata, updates)
1118+
1119+
existing_requirements = {req.key() for r in self._requirements}
1120+
for req in requirements:
1121+
key = req.key()
1122+
if key not in existing_requirements:
1123+
self._requirements += (req,)
1124+
existing_requirements.add(key)
1125+
1126+
self._requirements += (AssertTableUUID(uuid=self.table_metadata.table_uuid),)
9611127

9621128

9631129
class CreateTableTransaction(Transaction):
@@ -995,6 +1161,8 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:
9951161
SetPropertiesUpdate(updates=table_metadata.properties),
9961162
)
9971163

1164+
self._working_metadata = update_table_metadata(self._working_metadata, self._updates)
1165+
9981166
def __init__(self, table: StagedTable):
9991167
super().__init__(table, autocommit=False)
10001168
self._initial_changes(table.metadata)

pyiceberg/table/update/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ def __init__(self, transaction: Transaction) -> None:
6969
def _commit(self) -> UpdatesAndRequirements: ...
7070

7171
def commit(self) -> None:
72-
self._transaction._apply(*self._commit())
72+
updates, requirements = self._commit()
73+
self._transaction._apply(updates, requirements, pending_update=self)
7374

7475
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
7576
"""Close and commit the change."""
@@ -753,6 +754,10 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
753754
"""
754755
...
755756

757+
def key(self) -> tuple:
758+
"""Return a deduplication key for this requirement."""
759+
return (type(self),)
760+
756761

757762
class AssertCreate(ValidatableTableRequirement):
758763
"""The table must not already exist; used for create transactions."""
@@ -811,6 +816,9 @@ def validate(self, base_metadata: TableMetadata | None) -> None:
811816
elif self.snapshot_id is not None:
812817
raise CommitFailedException(f"Requirement failed: branch or tag {self.ref} is missing, expected {self.snapshot_id}")
813818

819+
def key(self) -> tuple:
820+
return (type(self), self.ref)
821+
814822

815823
class AssertLastAssignedFieldId(ValidatableTableRequirement):
816824
"""The table's last assigned column id must match the requirement's `last-assigned-field-id`."""

0 commit comments

Comments
 (0)