Skip to content

Commit c4ee6d4

Browse files
committed
Allow for date->time promotion on v3
This allows date->time promotion on v3 tables only.
1 parent 4348ee8 commit c4ee6d4

File tree

5 files changed

+109
-12
lines changed

5 files changed

+109
-12
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1596,6 +1596,7 @@ def _task_to_record_batches(
15961596
current_batch,
15971597
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
15981598
projected_missing_fields=projected_missing_fields,
1599+
format_version=format_version,
15991600
)
16001601

16011602

@@ -1788,13 +1789,18 @@ def _to_requested_schema(
17881789
downcast_ns_timestamp_to_us: bool = False,
17891790
include_field_ids: bool = False,
17901791
projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
1792+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
17911793
) -> pa.RecordBatch:
17921794
# We could reuse some of these visitors
17931795
struct_array = visit_with_partner(
17941796
requested_schema,
17951797
batch,
17961798
ArrowProjectionVisitor(
1797-
file_schema, downcast_ns_timestamp_to_us, include_field_ids, projected_missing_fields=projected_missing_fields
1799+
file_schema,
1800+
downcast_ns_timestamp_to_us,
1801+
include_field_ids,
1802+
projected_missing_fields=projected_missing_fields,
1803+
format_version=format_version,
17981804
),
17991805
ArrowAccessor(file_schema),
18001806
)
@@ -1808,19 +1814,23 @@ class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Arra
18081814
_use_large_types: Optional[bool]
18091815
_projected_missing_fields: Dict[int, Any]
18101816

1817+
_format_version: TableVersion
1818+
18111819
def __init__(
18121820
self,
18131821
file_schema: Schema,
18141822
downcast_ns_timestamp_to_us: bool = False,
18151823
include_field_ids: bool = False,
18161824
use_large_types: Optional[bool] = None,
18171825
projected_missing_fields: Dict[int, Any] = EMPTY_DICT,
1826+
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
18181827
) -> None:
18191828
self._file_schema = file_schema
18201829
self._include_field_ids = include_field_ids
18211830
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
18221831
self._use_large_types = use_large_types
18231832
self._projected_missing_fields = projected_missing_fields
1833+
self._format_version = format_version
18241834

18251835
if use_large_types is not None:
18261836
deprecation_message(
@@ -1862,7 +1872,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
18621872

18631873
if field.field_type != file_field.field_type:
18641874
target_schema = schema_to_pyarrow(
1865-
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
1875+
promote(file_field.field_type, field.field_type, format_version=self._format_version),
1876+
include_field_ids=self._include_field_ids,
18661877
)
18671878
if self._use_large_types is False:
18681879
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
@@ -2568,6 +2579,7 @@ def write_parquet(task: WriteTask) -> DataFile:
25682579
batch=batch,
25692580
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
25702581
include_field_ids=True,
2582+
format_version=table_metadata.format_version,
25712583
)
25722584
for batch in task.record_batches
25732585
]
@@ -2658,7 +2670,7 @@ def _check_pyarrow_schema_compatible(
26582670
raise ValueError(
26592671
f"PyArrow table contains more columns: {', '.join(sorted(additional_names))}. Update the schema first (hint, use union_by_name)."
26602672
) from e
2661-
_check_schema_compatible(requested_schema, provided_schema)
2673+
_check_schema_compatible(requested_schema, provided_schema, format_version=format_version)
26622674

26632675

26642676
def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, file_paths: Iterator[str]) -> Iterator[DataFile]:

pyiceberg/schema.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from pydantic import Field, PrivateAttr, model_validator
4040

4141
from pyiceberg.exceptions import ResolveError
42-
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol
42+
from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol, TableVersion
4343
from pyiceberg.types import (
4444
BinaryType,
4545
BooleanType,
@@ -1622,7 +1622,10 @@ def _project_map(map_type: MapType, value_result: IcebergType) -> MapType:
16221622

16231623

16241624
@singledispatch
1625-
def promote(file_type: IcebergType, read_type: IcebergType) -> IcebergType:
1625+
def promote(file_type: IcebergType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType:
1626+
from pyiceberg.table import TableProperties
1627+
1628+
format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
16261629
"""Promotes reading a file type to a read type.
16271630
16281631
Args:
@@ -1692,6 +1695,22 @@ def _(file_type: FixedType, read_type: IcebergType) -> IcebergType:
16921695
raise ResolveError(f"Cannot promote {file_type} to {read_type}")
16931696

16941697

1698+
@promote.register(DateType)
1699+
def _(file_type: DateType, read_type: IcebergType, format_version: Optional[TableVersion] = None) -> IcebergType:
1700+
from pyiceberg.table import TableProperties
1701+
1702+
format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
1703+
if format_version < 3:
1704+
raise ResolveError("DateType promotions can only occur on v3 tables.")
1705+
1706+
if isinstance(read_type, TimestampType):
1707+
return read_type
1708+
elif isinstance(read_type, TimestampNanoType):
1709+
return read_type
1710+
else:
1711+
raise ResolveError(f"Cannot promote {file_type} to {read_type}")
1712+
1713+
16951714
@promote.register(UnknownType)
16961715
def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType:
16971716
# Per V3 Spec, "Unknown" can be promoted to any Primitive type
@@ -1701,7 +1720,12 @@ def _(file_type: UnknownType, read_type: IcebergType) -> IcebergType:
17011720
raise ResolveError(f"Cannot promote {file_type} to {read_type}")
17021721

17031722

1704-
def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) -> None:
1723+
def _check_schema_compatible(
1724+
requested_schema: Schema, provided_schema: Schema, format_version: Optional[TableVersion] = None
1725+
) -> None:
1726+
from pyiceberg.table import TableProperties
1727+
1728+
format_version = format_version or TableProperties.DEFAULT_FORMAT_VERSION
17051729
"""
17061730
Check if the `provided_schema` is compatible with `requested_schema`.
17071731
@@ -1715,17 +1739,19 @@ def _check_schema_compatible(requested_schema: Schema, provided_schema: Schema)
17151739
Raises:
17161740
ValueError: If the schemas are not compatible.
17171741
"""
1718-
pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema))
1742+
pre_order_visit(requested_schema, _SchemaCompatibilityVisitor(provided_schema, format_version=format_version))
17191743

17201744

17211745
class _SchemaCompatibilityVisitor(PreOrderSchemaVisitor[bool]):
17221746
provided_schema: Schema
1747+
format_version: TableVersion
17231748

1724-
def __init__(self, provided_schema: Schema):
1749+
def __init__(self, provided_schema: Schema, format_version: TableVersion):
17251750
from rich.console import Console
17261751
from rich.table import Table as RichTable
17271752

17281753
self.provided_schema = provided_schema
1754+
self.format_version = format_version
17291755
self.rich_table = RichTable(show_header=True, header_style="bold")
17301756
self.rich_table.add_column("")
17311757
self.rich_table.add_column("Table field")
@@ -1766,7 +1792,7 @@ def _is_field_compatible(self, lhs: NestedField) -> bool:
17661792
try:
17671793
# If type can be promoted to the requested schema
17681794
# it is considered compatible
1769-
promote(rhs.field_type, lhs.field_type)
1795+
promote(rhs.field_type, lhs.field_type, format_version=self.format_version)
17701796
self.rich_table.add_row("✅", str(lhs), str(rhs))
17711797
return True
17721798
except ResolveError:

pyiceberg/table/update/schema.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ def update_column(
470470

471471
if not self._allow_incompatible_changes and field.field_type != field_type:
472472
try:
473-
promote(field.field_type, field_type)
473+
promote(field.field_type, field_type, format_version=self._transaction.table_metadata.format_version)
474474
except ResolveError as e:
475475
raise ValidationError(f"Cannot change column type: {full_name}: {field.field_type} -> {field_type}") from e
476476

@@ -894,7 +894,9 @@ def _update_column(self, field: NestedField, existing_field: NestedField) -> Non
894894
try:
895895
# If the current type is wider than the new type, then
896896
# we perform a noop
897-
_ = promote(field.field_type, existing_field.field_type)
897+
_ = promote(
898+
field.field_type, existing_field.field_type, self.update_schema._transaction.table_metadata.format_version
899+
)
898900
except ResolveError:
899901
# If this is not the case, perform the type evolution
900902
self.update_schema.update_column(full_name, field_type=field.field_type)

tests/conftest.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
from pyiceberg.schema import Accessor, Schema
7373
from pyiceberg.serializers import ToOutputFile
7474
from pyiceberg.table import FileScanTask, Table
75-
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
75+
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
7676
from pyiceberg.transforms import DayTransform, IdentityTransform
7777
from pyiceberg.types import (
7878
BinaryType,
@@ -2468,6 +2468,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
24682468
)
24692469

24702470

2471+
@pytest.fixture
2472+
def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table:
2473+
table_metadata = TableMetadataV3(**example_table_metadata_v3)
2474+
return Table(
2475+
identifier=("database", "table"),
2476+
metadata=table_metadata,
2477+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2478+
io=load_file_io(),
2479+
catalog=NoopCatalog("NoopCatalog"),
2480+
)
2481+
2482+
24712483
@pytest.fixture
24722484
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
24732485
import copy

tests/test_schema.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
PrimitiveType,
5353
StringType,
5454
StructType,
55+
TimestampNanoType,
5556
TimestampType,
5657
TimestamptzType,
5758
TimeType,
@@ -1687,3 +1688,47 @@ def test_arrow_schema() -> None:
16871688
)
16881689

16891690
assert base_schema.as_arrow() == expected_schema
1691+
1692+
1693+
def test_promote_date_to_timestamp(table_v3: Table) -> None:
1694+
"""Test promoting a DateType to a TimestampType"""
1695+
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
1696+
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))
1697+
1698+
transaction = table_v3.transaction()
1699+
applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply()
1700+
1701+
assert applied.as_struct() == new_schema.as_struct()
1702+
assert len(applied.fields) == 1
1703+
assert isinstance(applied.fields[0].field_type, TimestampType)
1704+
1705+
1706+
def test_promote_date_to_timestampnano(table_v3: Table) -> None:
1707+
"""Test promoting a DateType to a TimestampNanoType"""
1708+
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
1709+
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampNanoType(), required=False))
1710+
1711+
transaction = table_v3.transaction()
1712+
applied = UpdateSchema(transaction=transaction, schema=current_schema).union_by_name(new_schema)._apply()
1713+
1714+
assert applied.as_struct() == new_schema.as_struct()
1715+
assert len(applied.fields) == 1
1716+
assert isinstance(applied.fields[0].field_type, TimestampNanoType)
1717+
1718+
1719+
def test_promote_date_fails_for_v1_table(table_v1: Table) -> None:
1720+
"""Test that promoting a DateType fails for a v1 table"""
1721+
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
1722+
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))
1723+
1724+
with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"):
1725+
_ = UpdateSchema(transaction=Transaction(table_v1), schema=current_schema).union_by_name(new_schema)._apply()
1726+
1727+
1728+
def test_promote_date_fails_for_v2_table(table_v2: Table) -> None:
1729+
"""Test that promoting a DateType fails for a v2 table"""
1730+
current_schema = Schema(NestedField(field_id=1, name="a_date", field_type=DateType(), required=False))
1731+
new_schema = Schema(NestedField(field_id=1, name="a_date", field_type=TimestampType(), required=False))
1732+
1733+
with pytest.raises(ValidationError, match="Cannot change column type: a_date: date -> timestamp"):
1734+
_ = UpdateSchema(transaction=Transaction(table_v2), schema=current_schema).union_by_name(new_schema)._apply()

0 commit comments

Comments
 (0)