|
31 | 31 | NoSuchNamespaceError, |
32 | 32 | NoSuchTableError, |
33 | 33 | TableAlreadyExistsError, |
| 34 | + ValidationError, |
34 | 35 | ) |
35 | 36 | from pyiceberg.io import WAREHOUSE |
| 37 | +from pyiceberg.partitioning import PartitionField, PartitionSpec |
36 | 38 | from pyiceberg.schema import Schema |
| 39 | +from pyiceberg.table.sorting import SortOrder |
| 40 | +from pyiceberg.transforms import DayTransform |
| 41 | +from pyiceberg.types import IntegerType, NestedField, TimestampType |
37 | 42 | from tests.conftest import clean_up |
38 | 43 |
|
39 | 44 |
|
@@ -247,6 +252,59 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa |
247 | 252 | assert test_catalog.table_exists((database_name, table_name)) is True |
248 | 253 |
|
249 | 254 |
|
| 255 | +@pytest.mark.integration |
| 256 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 257 | +def test_incompatible_partitioned_schema_evolution( |
| 258 | + test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, database_name: str, table_name: str |
| 259 | +) -> None: |
| 260 | + if isinstance(test_catalog, HiveCatalog): |
| 261 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 262 | + |
| 263 | + identifier = (database_name, table_name) |
| 264 | + test_catalog.create_namespace(database_name) |
| 265 | + table = test_catalog.create_table(identifier, test_schema, partition_spec=test_partition_spec) |
| 266 | + assert test_catalog.table_exists(identifier) |
| 267 | + |
| 268 | + with pytest.raises(ValidationError): |
| 269 | + with table.update_schema() as update: |
| 270 | + update.delete_column("VendorID") |
| 271 | + |
| 272 | + # Assert column was not dropped |
| 273 | + assert "VendorID" in table.schema().column_names |
| 274 | + |
| 275 | + with table.transaction() as transaction: |
| 276 | + with transaction.update_spec() as spec_update: |
| 277 | + spec_update.remove_field("VendorID") |
| 278 | + |
| 279 | + with transaction.update_schema() as schema_update: |
| 280 | + schema_update.delete_column("VendorID") |
| 281 | + |
| 282 | + assert table.spec() == PartitionSpec(PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"), spec_id=1) |
| 283 | + assert table.schema() == Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False)) |
| 284 | + |
| 285 | + |
| 286 | +@pytest.mark.integration |
| 287 | +@pytest.mark.parametrize("test_catalog", CATALOGS) |
| 288 | +def test_incompatible_sorted_schema_evolution( |
| 289 | + test_catalog: Catalog, test_schema: Schema, test_sort_order: SortOrder, database_name: str, table_name: str |
| 290 | +) -> None: |
| 291 | + if isinstance(test_catalog, HiveCatalog): |
| 292 | + pytest.skip("HiveCatalog does not support schema evolution") |
| 293 | + |
| 294 | + identifier = (database_name, table_name) |
| 295 | + test_catalog.create_namespace(database_name) |
| 296 | + table = test_catalog.create_table(identifier, test_schema, sort_order=test_sort_order) |
| 297 | + assert test_catalog.table_exists(identifier) |
| 298 | + |
| 299 | + with pytest.raises(ValidationError): |
| 300 | + with table.update_schema() as update: |
| 301 | + update.delete_column("VendorID") |
| 302 | + |
| 303 | + assert table.schema() == Schema( |
| 304 | + NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False) |
| 305 | + ) |
| 306 | + |
| 307 | + |
250 | 308 | @pytest.mark.integration |
251 | 309 | @pytest.mark.parametrize("test_catalog", CATALOGS) |
252 | 310 | def test_create_namespace(test_catalog: Catalog, database_name: str) -> None: |
|
0 commit comments