Skip to content

Commit 7875f21

Browse files
committed
fix bug and add tests
1 parent 7c4ecf8 commit 7875f21

File tree

5 files changed

+441
-130
lines changed

5 files changed

+441
-130
lines changed

pyiceberg/table/__init__.py

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,38 +1032,25 @@ def commit_transaction(self) -> Table:
10321032
return self._table
10331033

10341034
def _commit_with_retry(self) -> None:
1035-
"""Commit transaction with retry logic for snapshot operations.
1035+
"""Commit transaction with retry logic.
10361036
10371037
On retry, refreshes table metadata and regenerates snapshots from
10381038
the pending snapshot producers.
10391039
"""
10401040
properties = self._table.metadata.properties
10411041

1042+
max_attempts = property_as_int(properties, TableProperties.COMMIT_NUM_RETRIES)
1043+
min_wait_ms = property_as_int(properties, TableProperties.COMMIT_MIN_RETRY_WAIT_MS)
1044+
max_wait_ms = property_as_int(properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS)
1045+
total_timeout_ms = property_as_int(properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS)
1046+
10421047
retry_config = RetryConfig(
1043-
max_attempts=property_as_int(
1044-
properties,
1045-
TableProperties.COMMIT_NUM_RETRIES,
1046-
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
1047-
)
1048-
or TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
1049-
min_wait_ms=property_as_int(
1050-
properties,
1051-
TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
1052-
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
1053-
)
1054-
or TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
1055-
max_wait_ms=property_as_int(
1056-
properties,
1057-
TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
1058-
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
1059-
)
1060-
or TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
1061-
total_timeout_ms=property_as_int(
1062-
properties,
1063-
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS,
1064-
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
1065-
)
1066-
or TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
1048+
max_attempts=max_attempts if max_attempts is not None else TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
1049+
min_wait_ms=min_wait_ms if min_wait_ms is not None else TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
1050+
max_wait_ms=max_wait_ms if max_wait_ms is not None else TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
1051+
total_timeout_ms=total_timeout_ms
1052+
if total_timeout_ms is not None
1053+
else TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
10671054
)
10681055

10691056
first_attempt = True
@@ -1073,7 +1060,6 @@ def do_commit() -> None:
10731060
if first_attempt:
10741061
first_attempt = False
10751062
else:
1076-
# On retry, reapply all updates with refreshed metadata
10771063
self._reapply_updates()
10781064

10791065
self._table._do_commit( # pylint: disable=W0212

pyiceberg/table/update/schema.py

Lines changed: 53 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,9 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
9090
_allow_incompatible_changes: bool
9191
_case_sensitive: bool
9292

93-
# Store user intent for retry support
94-
_column_additions: list[tuple[str | tuple[str, ...], IcebergType, str | None, bool, Any]]
95-
_column_updates: list[tuple[str | tuple[str, ...], IcebergType | None, bool | None, str | None]]
96-
_column_deletions: list[str | tuple[str, ...]]
97-
_column_renames: list[tuple[str | tuple[str, ...], str]]
98-
_move_operations: list[tuple[str, str | tuple[str, ...], str | tuple[str, ...] | None]]
99-
_optional_columns: list[str | tuple[str, ...]]
100-
_default_value_updates: list[tuple[str | tuple[str, ...], Any]]
93+
# Store all operations and order for retry support.
94+
# This _operations overlaps _adds, _updates and other intermediate variables.
95+
_operations: list[tuple[Any, ...]]
10196

10297
def __init__(
10398
self,
@@ -113,18 +108,8 @@ def __init__(
113108
self._case_sensitive = case_sensitive
114109
self._name_mapping = name_mapping
115110
self._provided_schema = schema # Store for _reset_state
111+
self._operations = [] # for retry support
116112

117-
# Initialize user intent storage
118-
self._column_additions = []
119-
self._column_updates = []
120-
self._column_deletions = []
121-
self._column_renames = []
122-
self._move_operations = []
123-
self._optional_columns = []
124-
self._default_value_updates = []
125-
self._identifier_field_updates: set[str] | None = None
126-
127-
# Initialize state from metadata
128113
self._init_state_from_metadata(schema)
129114

130115
def _init_state_from_metadata(self, schema: Schema | None = None) -> None:
@@ -155,38 +140,48 @@ def get_column_name(field_id: int) -> str:
155140
}
156141

157142
def _reset_state(self) -> None:
158-
"""Reset state for retry, rebuilding from refreshed metadata."""
159-
self._init_state_from_metadata(self._provided_schema)
160-
161-
for path, field_type, doc, required, default_value in self._column_additions:
162-
self._do_add_column(path, field_type, doc, required, default_value)
163-
164-
for path in self._column_deletions:
165-
self._do_delete_column(path)
166-
167-
for path_from, new_name in self._column_renames:
168-
self._do_rename_column(path_from, new_name)
169-
170-
for upd_path, upd_field_type, upd_required, upd_doc in self._column_updates:
171-
self._do_update_column(upd_path, upd_field_type, upd_required, upd_doc)
143+
"""Reset state for retry, rebuilding from refreshed metadata.
172144
173-
for path in self._optional_columns:
174-
self._set_column_requirement(path, required=False)
175-
176-
for path, default_value in self._default_value_updates:
177-
self._set_column_default_value(path, default_value)
145+
This is called on transaction retry to reapply the schema changes on top of the refreshed table metadata.
146+
"""
147+
self._init_state_from_metadata(self._provided_schema)
178148

179-
for op, path, other_path in self._move_operations:
180-
if op == "first":
149+
# Refresh name mapping from the latest table metadata to avoid overwriting concurrent changes
150+
if self._name_mapping is not None:
151+
self._name_mapping = self._transaction.table_metadata.name_mapping()
152+
153+
for operation in self._operations:
154+
op_type = operation[0]
155+
if op_type == "add":
156+
_, path, field_type, doc, required, default_value = operation
157+
self._do_add_column(path, field_type, doc, required, default_value)
158+
elif op_type == "delete":
159+
_, path = operation
160+
self._do_delete_column(path)
161+
elif op_type == "rename":
162+
_, path_from, new_name = operation
163+
self._do_rename_column(path_from, new_name)
164+
elif op_type == "update":
165+
_, path, field_type, required, doc = operation
166+
self._do_update_column(path, field_type, required, doc)
167+
elif op_type == "optional":
168+
_, path = operation
169+
self._set_column_requirement(path, required=False)
170+
elif op_type == "default_value":
171+
_, path, default_value = operation
172+
self._set_column_default_value(path, default_value)
173+
elif op_type == "move_first":
174+
_, path = operation
181175
self._do_move_first(path)
182-
elif op == "before":
183-
self._do_move_before(path, other_path) # type: ignore
184-
elif op == "after":
185-
self._do_move_after(path, other_path) # type: ignore
186-
187-
# Restore identifier fields if they were explicitly set
188-
if self._identifier_field_updates is not None:
189-
self._identifier_field_names = self._identifier_field_updates.copy()
176+
elif op_type == "move_before":
177+
_, path, before_path = operation
178+
self._do_move_before(path, before_path)
179+
elif op_type == "move_after":
180+
_, path, after_name = operation
181+
self._do_move_after(path, after_name)
182+
elif op_type == "set_identifier_fields":
183+
_, fields = operation
184+
self._identifier_field_names = set(fields)
190185

191186
def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
192187
"""Determine if the case of schema needs to be considered when comparing column names.
@@ -243,7 +238,7 @@ def add_column(
243238
Returns:
244239
This for method chaining.
245240
"""
246-
self._column_additions.append((path, field_type, doc, required, default_value))
241+
self._operations.append(("add", path, field_type, doc, required, default_value))
247242
self._do_add_column(path, field_type, doc, required, default_value)
248243
return self
249244

@@ -335,7 +330,7 @@ def delete_column(self, path: str | tuple[str, ...]) -> UpdateSchema:
335330
Returns:
336331
The UpdateSchema with the delete operation staged.
337332
"""
338-
self._column_deletions.append(path)
333+
self._operations.append(("delete", path))
339334
self._do_delete_column(path)
340335
return self
341336

@@ -362,7 +357,7 @@ def set_default_value(self, path: str | tuple[str, ...], default_value: L | None
362357
Returns:
363358
The UpdateSchema with the delete operation staged.
364359
"""
365-
self._default_value_updates.append((path, default_value))
360+
self._operations.append(("default_value", path, default_value))
366361
self._set_column_default_value(path, default_value)
367362
return self
368363

@@ -376,7 +371,7 @@ def rename_column(self, path_from: str | tuple[str, ...], new_name: str) -> Upda
376371
Returns:
377372
The UpdateSchema with the rename operation staged.
378373
"""
379-
self._column_renames.append((path_from, new_name))
374+
self._operations.append(("rename", path_from, new_name))
380375
self._do_rename_column(path_from, new_name)
381376
return self
382377

@@ -425,12 +420,12 @@ def make_column_optional(self, path: str | tuple[str, ...]) -> UpdateSchema:
425420
Returns:
426421
The UpdateSchema with the requirement change staged.
427422
"""
428-
self._optional_columns.append(path)
423+
self._operations.append(("optional", path))
429424
self._set_column_requirement(path, required=False)
430425
return self
431426

432427
def set_identifier_fields(self, *fields: str) -> None:
433-
self._identifier_field_updates = set(fields)
428+
self._operations.append(("set_identifier_fields", fields))
434429
self._identifier_field_names = set(fields)
435430

436431
def _set_column_requirement(self, path: str | tuple[str, ...], required: bool) -> None:
@@ -535,8 +530,7 @@ def update_column(
535530
if field_type is None and required is None and doc is None:
536531
return self
537532

538-
# Store intent for retry support
539-
self._column_updates.append((path, field_type, required, doc))
533+
self._operations.append(("update", path, field_type, required, doc))
540534
self._do_update_column(path, field_type, required, doc)
541535
return self
542536

@@ -633,7 +627,7 @@ def move_first(self, path: str | tuple[str, ...]) -> UpdateSchema:
633627
Returns:
634628
The UpdateSchema with the move operation staged.
635629
"""
636-
self._move_operations.append(("first", path, None))
630+
self._operations.append(("move_first", path))
637631
self._do_move_first(path)
638632
return self
639633

@@ -657,7 +651,7 @@ def move_before(self, path: str | tuple[str, ...], before_path: str | tuple[str,
657651
Returns:
658652
The UpdateSchema with the move operation staged.
659653
"""
660-
self._move_operations.append(("before", path, before_path))
654+
self._operations.append(("move_before", path, before_path))
661655
self._do_move_before(path, before_path)
662656
return self
663657

@@ -695,7 +689,7 @@ def move_after(self, path: str | tuple[str, ...], after_name: str | tuple[str, .
695689
Returns:
696690
The UpdateSchema with the move operation staged.
697691
"""
698-
self._move_operations.append(("after", path, after_name))
692+
self._operations.append(("move_after", path, after_name))
699693
self._do_move_after(path, after_name)
700694
return self
701695

pyiceberg/table/update/spec.py

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,15 @@ class UpdateSpec(UpdateTableMetadata["UpdateSpec"]):
5858
_adds: list[PartitionField]
5959
_deletes: set[int]
6060
_last_assigned_partition_id: int
61-
# Store (source_column_name, transform, partition_field_name) for retry support
62-
_field_additions: list[tuple[str, Transform[Any, Any], str | None]]
61+
# Store all operations and order for retry support.
62+
_operations: list[tuple[Any, ...]]
6363

6464
def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None:
6565
super().__init__(transaction)
6666
self._transaction = transaction
6767
self._case_sensitive = case_sensitive
68-
self._field_additions = []
69-
self._deletes = set()
70-
self._renames = {}
71-
# Initialize state from current metadata
68+
self._operations = []
69+
7270
self._init_state_from_metadata()
7371

7472
def _init_state_from_metadata(self) -> None:
@@ -78,6 +76,8 @@ def _init_state_from_metadata(self) -> None:
7876
self._transform_to_field = {(field.source_id, repr(field.transform)): field for field in spec.fields}
7977
self._last_assigned_partition_id = self._transaction.table_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1
8078
# Clear intermediate state
79+
self._deletes = set()
80+
self._renames = {}
8181
self._name_to_added_field = {}
8282
self._transform_to_added_field = {}
8383
self._added_time_fields = {}
@@ -89,8 +89,18 @@ def _reset_state(self) -> None:
8989
This is called on transaction retry to reapply the spec changes on top of the refreshed table metadata.
9090
"""
9191
self._init_state_from_metadata()
92-
for source_column_name, transform, partition_field_name in self._field_additions:
93-
self._do_add_field(source_column_name, transform, partition_field_name)
92+
93+
for operation in self._operations:
94+
op_type = operation[0]
95+
if op_type == "add":
96+
_, source_column_name, transform, partition_field_name = operation
97+
self._do_add_field(source_column_name, transform, partition_field_name)
98+
elif op_type == "remove":
99+
_, name = operation
100+
self._do_remove_field(name)
101+
elif op_type == "rename":
102+
_, name, new_name = operation
103+
self._do_rename_field(name, new_name)
94104

95105
def _do_add_field(
96106
self,
@@ -128,7 +138,7 @@ def _do_add_field(
128138
existing_partition_field = self._name_to_field.get(new_field.name)
129139
if existing_partition_field and new_field.field_id not in self._deletes:
130140
if isinstance(existing_partition_field.transform, VoidTransform):
131-
self.rename_field(
141+
self._do_rename_field(
132142
existing_partition_field.name, existing_partition_field.name + "_" + str(existing_partition_field.field_id)
133143
)
134144
else:
@@ -144,14 +154,15 @@ def add_field(
144154
partition_field_name: str | None = None,
145155
) -> UpdateSpec:
146156
transform = parse_transform(transform)
147-
self._field_additions.append((source_column_name, transform, partition_field_name))
157+
self._operations.append(("add", source_column_name, transform, partition_field_name))
148158
self._do_add_field(source_column_name, transform, partition_field_name)
149159
return self
150160

151161
def add_identity(self, source_column_name: str) -> UpdateSpec:
152162
return self.add_field(source_column_name, IdentityTransform(), None)
153163

154-
def remove_field(self, name: str) -> UpdateSpec:
164+
def _do_remove_field(self, name: str) -> None:
165+
"""Remove a partition field (internal implementation for retry support)."""
155166
added = self._name_to_added_field.get(name)
156167
if added:
157168
raise ValueError(f"Cannot delete newly added field {name}")
@@ -163,12 +174,18 @@ def remove_field(self, name: str) -> UpdateSpec:
163174
raise ValueError(f"No such partition field: {name}")
164175

165176
self._deletes.add(field.field_id)
177+
178+
def remove_field(self, name: str) -> UpdateSpec:
179+
self._operations.append(("remove", name))
180+
self._do_remove_field(name)
166181
return self
167182

168-
def rename_field(self, name: str, new_name: str) -> UpdateSpec:
183+
def _do_rename_field(self, name: str, new_name: str) -> None:
184+
"""Rename a partition field (internal implementation for retry support)."""
169185
existing_field = self._name_to_field.get(new_name)
170186
if existing_field and isinstance(existing_field.transform, VoidTransform):
171-
return self.rename_field(name, name + "_" + str(existing_field.field_id))
187+
self._do_rename_field(name, name + "_" + str(existing_field.field_id))
188+
return
172189
added = self._name_to_added_field.get(name)
173190
if added:
174191
raise ValueError("Cannot rename recently added partitions")
@@ -178,6 +195,10 @@ def rename_field(self, name: str, new_name: str) -> UpdateSpec:
178195
if field.field_id in self._deletes:
179196
raise ValueError(f"Cannot delete and rename partition field {name}")
180197
self._renames[name] = new_name
198+
199+
def rename_field(self, name: str, new_name: str) -> UpdateSpec:
200+
self._operations.append(("rename", name, new_name))
201+
self._do_rename_field(name, new_name)
181202
return self
182203

183204
def _commit(self) -> UpdatesAndRequirements:

pyiceberg/utils/retry.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
"""Retry utilities for Iceberg operations."""
19-
2018
from __future__ import annotations
2119

2220
import logging

0 commit comments

Comments
 (0)