Skip to content

Commit f888d50

Browse files
committed
fix lint
1 parent 4bc975b commit f888d50

File tree

6 files changed

+77
-47
lines changed

6 files changed

+77
-47
lines changed

pyiceberg/table/update/schema.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ class UpdateSchema(UpdateTableMetadata["UpdateSchema"]):
9191
_case_sensitive: bool
9292

9393
# Store user intent for retry support
94-
_column_additions: list[tuple[str | tuple[str, ...], IcebergType, str | None, bool, L | None]]
94+
_column_additions: list[tuple[str | tuple[str, ...], IcebergType, str | None, bool, Any]]
9595
_column_updates: list[tuple[str | tuple[str, ...], IcebergType | None, bool | None, str | None]]
9696
_column_deletions: list[str | tuple[str, ...]]
9797
_column_renames: list[tuple[str | tuple[str, ...], str]]
9898
_move_operations: list[tuple[str, str | tuple[str, ...], str | tuple[str, ...] | None]]
9999
_optional_columns: list[str | tuple[str, ...]]
100-
_default_value_updates: list[tuple[str | tuple[str, ...], L | None]]
100+
_default_value_updates: list[tuple[str | tuple[str, ...], Any]]
101101

102102
def __init__(
103103
self,
@@ -167,8 +167,8 @@ def _reset_state(self) -> None:
167167
for path_from, new_name in self._column_renames:
168168
self._do_rename_column(path_from, new_name)
169169

170-
for path, field_type, required, doc in self._column_updates:
171-
self._do_update_column(path, field_type, required, doc)
170+
for upd_path, upd_field_type, upd_required, upd_doc in self._column_updates:
171+
self._do_update_column(upd_path, upd_field_type, upd_required, upd_doc)
172172

173173
for path in self._optional_columns:
174174
self._set_column_requirement(path, required=False)
@@ -255,7 +255,7 @@ def _do_add_column(
255255
required: bool,
256256
default_value: L | None,
257257
) -> None:
258-
"""Internal method to add a column. Used by add_column and _reset_state."""
258+
"""Add a column to the schema. Used by add_column and _reset_state."""
259259
if isinstance(path, str):
260260
if "." in path:
261261
raise ValueError(f"Cannot add column with ambiguous name: {path}, provide a tuple instead")
@@ -340,7 +340,7 @@ def delete_column(self, path: str | tuple[str, ...]) -> UpdateSchema:
340340
return self
341341

342342
def _do_delete_column(self, path: str | tuple[str, ...]) -> None:
343-
"""Internal method to delete a column. Used by delete_column and _reset_state."""
343+
"""Delete a column from the schema. Used by delete_column and _reset_state."""
344344
name = (path,) if isinstance(path, str) else path
345345
full_name = ".".join(name)
346346

@@ -381,7 +381,7 @@ def rename_column(self, path_from: str | tuple[str, ...], new_name: str) -> Upda
381381
return self
382382

383383
def _do_rename_column(self, path_from: str | tuple[str, ...], new_name: str) -> None:
384-
"""Internal method to rename a column. Used by rename_column and _reset_state."""
384+
"""Rename a column in the schema. Used by rename_column and _reset_state."""
385385
path_from = ".".join(path_from) if isinstance(path_from, tuple) else path_from
386386
field_from = self._schema.find_field(path_from, self._case_sensitive)
387387

@@ -547,7 +547,7 @@ def _do_update_column(
547547
required: bool | None,
548548
doc: str | None,
549549
) -> None:
550-
"""Internal method to update a column. Used by update_column and _reset_state."""
550+
"""Update a column in the schema. Used by update_column and _reset_state."""
551551
path = (path,) if isinstance(path, str) else path
552552
full_name = ".".join(path)
553553

@@ -638,7 +638,7 @@ def move_first(self, path: str | tuple[str, ...]) -> UpdateSchema:
638638
return self
639639

640640
def _do_move_first(self, path: str | tuple[str, ...]) -> None:
641-
"""Internal method to move a field to first position. Used by move_first and _reset_state."""
641+
"""Move a field to first position. Used by move_first and _reset_state."""
642642
full_name = ".".join(path) if isinstance(path, tuple) else path
643643

644644
field_id = self._find_for_move(full_name)
@@ -662,7 +662,7 @@ def move_before(self, path: str | tuple[str, ...], before_path: str | tuple[str,
662662
return self
663663

664664
def _do_move_before(self, path: str | tuple[str, ...], before_path: str | tuple[str, ...]) -> None:
665-
"""Internal method to move a field before another. Used by move_before and _reset_state."""
665+
"""Move a field before another. Used by move_before and _reset_state."""
666666
full_name = ".".join(path) if isinstance(path, tuple) else path
667667
field_id = self._find_for_move(full_name)
668668

@@ -700,7 +700,7 @@ def move_after(self, path: str | tuple[str, ...], after_name: str | tuple[str, .
700700
return self
701701

702702
def _do_move_after(self, path: str | tuple[str, ...], after_name: str | tuple[str, ...]) -> None:
703-
"""Internal method to move a field after another. Used by move_after and _reset_state."""
703+
"""Move a field after another. Used by move_after and _reset_state."""
704704
full_name = ".".join(path) if isinstance(path, tuple) else path
705705

706706
field_id = self._find_for_move(full_name)

pyiceberg/table/update/spec.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def _init_state_from_metadata(self) -> None:
8686
def _reset_state(self) -> None:
8787
"""Reset state for retry, rebuilding from refreshed metadata.
8888
89-
This is called on transaction retry to reapply the spec changes on top of the refreshed table metadata."""
89+
This is called on transaction retry to reapply the spec changes on top of the refreshed table metadata.
90+
"""
9091
self._init_state_from_metadata()
9192
for source_column_name, transform, partition_field_name in self._field_additions:
9293
self._do_add_field(source_column_name, transform, partition_field_name)

tests/catalog/test_sql.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
SqlCatalogBaseTable,
4040
)
4141
from pyiceberg.exceptions import (
42-
CommitFailedException,
4342
NamespaceAlreadyExistsError,
4443
NamespaceNotEmptyError,
4544
NoSuchNamespaceError,

tests/integration/test_catalog.py

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
import os
1919
from collections.abc import Generator
2020
from pathlib import Path, PosixPath
21+
from unittest.mock import patch
2122

2223
import pytest
23-
from unittest.mock import patch
2424

2525
from pyiceberg.catalog import Catalog, MetastoreCatalog, load_catalog
2626
from pyiceberg.catalog.hive import HiveCatalog
@@ -306,11 +306,7 @@ def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, tabl
306306
identifier = (database_name, table_name)
307307

308308
test_catalog.create_namespace(database_name)
309-
table = test_catalog.create_table(
310-
identifier,
311-
test_schema,
312-
properties={TableProperties.COMMIT_NUM_RETRIES: "1"}
313-
)
309+
table = test_catalog.create_table(identifier, test_schema, properties={TableProperties.COMMIT_NUM_RETRIES: "1"})
314310
assert test_catalog.table_exists(identifier)
315311

316312
original_update = table.update_schema().add_column("new_col", LongType())
@@ -330,18 +326,16 @@ def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, tabl
330326

331327
@pytest.mark.integration
332328
@pytest.mark.parametrize("test_catalog", CATALOGS)
333-
def test_update_schema_conflict_with_retry(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
329+
def test_update_schema_conflict_with_retry(
330+
test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str
331+
) -> None:
334332
if isinstance(test_catalog, HiveCatalog):
335333
pytest.skip("HiveCatalog fails in this test, need to investigate")
336334

337335
identifier = (database_name, table_name)
338336

339337
test_catalog.create_namespace(database_name)
340-
table = test_catalog.create_table(
341-
identifier,
342-
test_schema,
343-
properties={TableProperties.COMMIT_NUM_RETRIES: "2"}
344-
)
338+
table = test_catalog.create_table(identifier, test_schema, properties={TableProperties.COMMIT_NUM_RETRIES: "2"})
345339
assert test_catalog.table_exists(identifier)
346340

347341
original_update = table.update_schema().add_column("new_col", LongType())
@@ -361,7 +355,7 @@ def mock_commit(
361355
concurrent_update.commit()
362356
original_update.commit()
363357

364-
assert commit_count == 3 # concurrent_update, original_update(fail), retry original_update(success)
358+
assert commit_count == 3 # concurrent_update, original_update(fail), retry original_update(success)
365359

366360
table = test_catalog.load_table(identifier)
367361
expected_schema = Schema(
@@ -578,10 +572,7 @@ def test_update_table_spec_conflict(test_catalog: Catalog, test_schema: Schema,
578572
test_catalog.create_namespace(database_name)
579573
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket"))
580574
table = test_catalog.create_table(
581-
identifier,
582-
test_schema,
583-
partition_spec=spec,
584-
properties={TableProperties.COMMIT_NUM_RETRIES: "1"}
575+
identifier, test_schema, partition_spec=spec, properties={TableProperties.COMMIT_NUM_RETRIES: "1"}
585576
)
586577

587578
update = table.update_spec()
@@ -603,15 +594,14 @@ def test_update_table_spec_conflict(test_catalog: Catalog, test_schema: Schema,
603594

604595
@pytest.mark.integration
605596
@pytest.mark.parametrize("test_catalog", CATALOGS)
606-
def test_update_table_spec_conflict_with_retry(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
597+
def test_update_table_spec_conflict_with_retry(
598+
test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str
599+
) -> None:
607600
identifier = (database_name, table_name)
608601
test_catalog.create_namespace(database_name)
609602
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="id_bucket"))
610603
table = test_catalog.create_table(
611-
identifier,
612-
test_schema,
613-
partition_spec=spec,
614-
properties={TableProperties.COMMIT_NUM_RETRIES: "2"}
604+
identifier, test_schema, partition_spec=spec, properties={TableProperties.COMMIT_NUM_RETRIES: "2"}
615605
)
616606
update = table.update_spec()
617607
update.add_field(source_column_name="tpep_pickup_datetime", transform=BucketTransform(16), partition_field_name="shard")

tests/integration/test_writes/test_optimistic_concurrency.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18+
from unittest.mock import patch
19+
1820
import pyarrow as pa
1921
import pytest
2022
from pyspark.sql import SparkSession
21-
from unittest.mock import patch
2223

2324
from pyiceberg.catalog import Catalog
2425
from pyiceberg.exceptions import CommitFailedException
@@ -34,7 +35,12 @@ def test_conflict_delete_delete(
3435
) -> None:
3536
"""This test should start passing once optimistic concurrency control has been implemented."""
3637
identifier = "default.test_conflict"
37-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"}, [arrow_table_with_null])
38+
tbl1 = _create_table(
39+
session_catalog,
40+
identifier,
41+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"},
42+
[arrow_table_with_null],
43+
)
3844
tbl2 = session_catalog.load_table(identifier)
3945

4046
tbl1.delete("string == 'z'")
@@ -51,7 +57,12 @@ def test_conflict_delete_delete_with_retry(
5157
) -> None:
5258
"""This test should start passing once optimistic concurrency control has been implemented."""
5359
identifier = "default.test_conflict"
54-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"}, [arrow_table_with_null])
60+
tbl1 = _create_table(
61+
session_catalog,
62+
identifier,
63+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"},
64+
[arrow_table_with_null],
65+
)
5566
tbl2 = session_catalog.load_table(identifier)
5667

5768
tbl1.delete("string == 'z'")
@@ -73,14 +84,20 @@ def mock_commit(
7384

7485
assert commit_count == 2
7586

87+
7688
@pytest.mark.integration
7789
@pytest.mark.parametrize("format_version", [1, 2])
7890
def test_conflict_delete_append(
7991
spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int
8092
) -> None:
8193
"""This test should start passing once optimistic concurrency control has been implemented."""
8294
identifier = "default.test_conflict"
83-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"}, [arrow_table_with_null])
95+
tbl1 = _create_table(
96+
session_catalog,
97+
identifier,
98+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"},
99+
[arrow_table_with_null],
100+
)
84101
tbl2 = session_catalog.load_table(identifier)
85102

86103
# This is allowed
@@ -98,7 +115,12 @@ def test_conflict_delete_append_with_retry(
98115
) -> None:
99116
"""This test should start passing once optimistic concurrency control has been implemented."""
100117
identifier = "default.test_conflict"
101-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"}, [arrow_table_with_null])
118+
tbl1 = _create_table(
119+
session_catalog,
120+
identifier,
121+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"},
122+
[arrow_table_with_null],
123+
)
102124
tbl2 = session_catalog.load_table(identifier)
103125

104126
# This is allowed
@@ -129,7 +151,12 @@ def test_conflict_append_delete(
129151
) -> None:
130152
"""This test should start passing once optimistic concurrency control has been implemented."""
131153
identifier = "default.test_conflict"
132-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"}, [arrow_table_with_null])
154+
tbl1 = _create_table(
155+
session_catalog,
156+
identifier,
157+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"},
158+
[arrow_table_with_null],
159+
)
133160
tbl2 = session_catalog.load_table(identifier)
134161

135162
tbl1.append(arrow_table_with_null)
@@ -146,7 +173,12 @@ def test_conflict_append_delete_with_retry(
146173
) -> None:
147174
"""This test should start passing once optimistic concurrency control has been implemented."""
148175
identifier = "default.test_conflict"
149-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"}, [arrow_table_with_null])
176+
tbl1 = _create_table(
177+
session_catalog,
178+
identifier,
179+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"},
180+
[arrow_table_with_null],
181+
)
150182
tbl2 = session_catalog.load_table(identifier)
151183

152184
tbl1.append(arrow_table_with_null)
@@ -176,7 +208,12 @@ def test_conflict_append_append(
176208
) -> None:
177209
"""This test should start passing once optimistic concurrency control has been implemented."""
178210
identifier = "default.test_conflict"
179-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"}, [arrow_table_with_null])
211+
tbl1 = _create_table(
212+
session_catalog,
213+
identifier,
214+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "1"},
215+
[arrow_table_with_null],
216+
)
180217
tbl2 = session_catalog.load_table(identifier)
181218

182219
tbl1.append(arrow_table_with_null)
@@ -193,7 +230,12 @@ def test_conflict_append_append_with_retry(
193230
) -> None:
194231
"""This test should start passing once optimistic concurrency control has been implemented."""
195232
identifier = "default.test_conflict"
196-
tbl1 = _create_table(session_catalog, identifier, {"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"}, [arrow_table_with_null])
233+
tbl1 = _create_table(
234+
session_catalog,
235+
identifier,
236+
{"format-version": format_version, TableProperties.COMMIT_NUM_RETRIES: "2"},
237+
[arrow_table_with_null],
238+
)
197239
tbl2 = session_catalog.load_table(identifier)
198240

199241
tbl1.append(arrow_table_with_null)

tests/table/test_commit_retry.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,9 +1024,7 @@ def mock_commit(
10241024

10251025
with patch.object(catalog, "commit_table", side_effect=mock_commit):
10261026
with table.update_spec() as update_spec:
1027-
update_spec.add_field(
1028-
source_column_name="id", transform=BucketTransform(16), partition_field_name="id_bucket"
1029-
)
1027+
update_spec.add_field(source_column_name="id", transform=BucketTransform(16), partition_field_name="id_bucket")
10301028

10311029
assert commit_count == 2
10321030

0 commit comments

Comments
 (0)