diff --git a/pyiceberg/partitioning.py b/pyiceberg/partitioning.py index 45d0dfd212..8bf2b817d9 100644 --- a/pyiceberg/partitioning.py +++ b/pyiceberg/partitioning.py @@ -56,6 +56,7 @@ TimestampType, TimestamptzType, TimeType, + UnknownType, UUIDType, ) from pyiceberg.utils.datetime import date_to_days, datetime_to_micros, time_to_micros @@ -222,11 +223,14 @@ def partition_type(self, schema: Schema) -> StructType: :return: A StructType that represents the PartitionSpec, with a NestedField for each PartitionField. """ nested_fields = [] + schema_ids = schema._lazy_id_to_field for field in self.fields: - source_type = schema.find_type(field.source_id) - result_type = field.transform.result_type(source_type) - required = schema.find_field(field.source_id).required - nested_fields.append(NestedField(field.field_id, field.name, result_type, required=required)) + if source_field := schema_ids.get(field.source_id): + result_type = field.transform.result_type(source_field.field_type) + nested_fields.append(NestedField(field.field_id, field.name, result_type, required=source_field.required)) + else: + # Since the source field has been drop we cannot determine the type + nested_fields.append(NestedField(field.field_id, field.name, UnknownType())) return StructType(*nested_fields) def partition_to_path(self, data: Record, schema: Schema) -> str: diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 99116ad16f..785037aef3 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -1083,3 +1083,19 @@ def test_filter_after_arrow_scan(catalog: Catalog) -> None: scan = scan.filter("ts >= '2023-03-05T00:00:00+00:00'") assert len(scan.to_arrow()) > 0 + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) +def test_scan_source_field_missing_in_spec(catalog: Catalog, spark: SparkSession) -> None: + identifier = "default.test_dropped_field" + spark.sql(f"DROP TABLE IF EXISTS {identifier}") + spark.sql(f"CREATE TABLE {identifier} (foo int, bar int, jaz string) USING ICEBERG PARTITIONED BY (foo, bar)") + spark.sql( + f"INSERT INTO {identifier} (foo, bar, jaz) VALUES (1, 1, 'dummy data'), (1, 2, 'dummy data again'), (2, 1, 'another partition')" + ) + spark.sql(f"ALTER TABLE {identifier} DROP PARTITION FIELD foo") + spark.sql(f"ALTER TABLE {identifier} DROP COLUMN foo") + + table = catalog.load_table(identifier) + assert len(list(table.scan().plan_files())) == 3 diff --git a/tests/table/test_partitioning.py b/tests/table/test_partitioning.py index 0fe22391c0..576297c6f2 100644 --- a/tests/table/test_partitioning.py +++ b/tests/table/test_partitioning.py @@ -47,6 +47,7 @@ TimestampType, TimestamptzType, TimeType, + UnknownType, UUIDType, ) @@ -165,6 +166,28 @@ def test_partition_spec_to_path() -> None: assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10" +def test_partition_spec_to_path_dropped_source_id() -> None: + schema = Schema( + NestedField(field_id=1, name="str", field_type=StringType(), required=False), + NestedField(field_id=2, name="other_str", field_type=StringType(), required=False), + NestedField(field_id=3, name="int", field_type=IntegerType(), required=True), + ) + + spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="my#str%bucket"), + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="other str+bucket"), + # Point partition field to missing source id + PartitionField(source_id=4, field_id=1002, transform=BucketTransform(num_buckets=25), name="my!int:bucket"), + spec_id=3, + ) + + record = Record("my+str", "( )", 10) + + # Both partition field names and values should be URL encoded, with spaces mapping to plus signs, to match the Java + # behaviour: https://github.com/apache/iceberg/blob/ca3db931b0f024f0412084751ac85dd4ef2da7e7/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L198-L204 + assert spec.partition_to_path(record, schema) == "my%23str%25bucket=my%2Bstr/other+str%2Bbucket=%28+%29/my%21int%3Abucket=10" + + def test_partition_type(table_schema_simple: Schema) -> None: spec = PartitionSpec( PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"), @@ -178,6 +201,19 @@ def test_partition_type(table_schema_simple: Schema) -> None: ) +def test_partition_type_missing_source_field(table_schema_simple: Schema) -> None: + spec = PartitionSpec( + PartitionField(source_id=1, field_id=1000, transform=TruncateTransform(width=19), name="str_truncate"), + PartitionField(source_id=10, field_id=1001, transform=BucketTransform(num_buckets=25), name="int_bucket"), + spec_id=3, + ) + + assert spec.partition_type(table_schema_simple) == StructType( + NestedField(field_id=1000, name="str_truncate", field_type=StringType(), required=False), + NestedField(field_id=1001, name="int_bucket", field_type=UnknownType(), required=False), + ) + + @pytest.mark.parametrize( "source_type, value", [