diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index f6ae41e3d386..1e6f2ba95357 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -160,7 +160,8 @@ def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=T return entries def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: - if file_dict['_VALUE_STATS_COLS'] is None: + value_stats_cols = file_dict.get('_VALUE_STATS_COLS') + if value_stats_cols is None: if '_WRITE_COLS' in file_dict: if file_dict['_WRITE_COLS'] is None: fields = schema_fields @@ -169,10 +170,10 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: fields = [self.table.field_dict[col] for col in read_fields] else: fields = schema_fields - elif not file_dict['_VALUE_STATS_COLS']: + elif not value_stats_cols: fields = [] else: - fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] + fields = [self.table.field_dict[col] for col in value_stats_cols] return fields def write(self, file_name, entries: List[ManifestEntry]): diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index c6223dadcb31..3e0244e3d983 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -471,6 +471,9 @@ def test_value_stats_cols_param(self): test_name="specific_case" ) + # Test case 4: _VALUE_STATS_COLS field is missing (old version manifest files) + self._test_value_stats_cols_missing_case(manifest_manager, table) + schema_with_stats = Schema.from_pyarrow_schema(pa_schema, options={'metadata.stats-mode': 'full'}) catalog.create_table("test_db.test_value_stats_cols_schema_match", schema_with_stats, False) table_with_stats = catalog.get_table("test_db.test_value_stats_cols_schema_match") @@ -813,6 +816,23 @@ def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, manifest_file_name = f"manifest-test-{test_name}" manifest_manager.write(manifest_file_name, [entry]) + if value_stats_cols is None: + from pathlib import Path + import fastavro + manifest_path = Path(table.table_path) / "manifest" / manifest_file_name + with table.file_io.new_input_stream(str(manifest_path)) as f: + reader = fastavro.reader(f) + records = list(reader) + file_dict = records[0]['_FILE'] + try: + if file_dict['_VALUE_STATS_COLS'] is None: + pass + except KeyError: + self.fail( + "_VALUE_STATS_COLS field should be written to manifest file when value is None. " + "Field missing indicates fastavro omitted the None value, which breaks backward compatibility." + ) + # Read the manifest entry back entries = manifest_manager.read(manifest_file_name, drop_stats=False) @@ -883,6 +903,43 @@ def _test_append_only_schema_match_case(self, table, pa_schema): f"but table.fields has {table_field_names}" ) + def _test_value_stats_cols_missing_case(self, manifest_manager, table): + from io import BytesIO + from pathlib import Path + import fastavro + from pypaimon.manifest.schema.manifest_entry import MANIFEST_ENTRY_SCHEMA + from pypaimon.table.row.binary_row import BinaryRow + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + + empty_row_bytes = GenericRowSerializer.to_bytes(BinaryRow(b'\x00' * 12, [])) + empty_generic_row = GenericRow([], []) + manifest_file_name = "manifest-missing-value-stats-cols-test" + manifest_path = Path(table.table_path) / "manifest" / manifest_file_name + manifest_path.parent.mkdir(parents=True, exist_ok=True) + + file_dict = { + "_FILE_NAME": "test.parquet", "_FILE_SIZE": 1000, "_ROW_COUNT": 10, + "_MIN_KEY": GenericRowSerializer.to_bytes(empty_generic_row), + "_MAX_KEY": GenericRowSerializer.to_bytes(empty_generic_row), + "_KEY_STATS": {"_MIN_VALUES": empty_row_bytes, "_MAX_VALUES": empty_row_bytes, "_NULL_COUNTS": []}, + "_VALUE_STATS": {"_MIN_VALUES": empty_row_bytes, "_MAX_VALUES": empty_row_bytes, "_NULL_COUNTS": []}, + "_MIN_SEQUENCE_NUMBER": 1, "_MAX_SEQUENCE_NUMBER": 10, "_SCHEMA_ID": 0, "_LEVEL": 0, + "_EXTRA_FILES": [], "_CREATION_TIME": None, "_DELETE_ROW_COUNT": None, + "_EMBEDDED_FILE_INDEX": None, "_FILE_SOURCE": None, "_EXTERNAL_PATH": None, + } + + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, [{ + "_VERSION": 2, "_KIND": 0, "_PARTITION": GenericRowSerializer.to_bytes(empty_generic_row), + "_BUCKET": 0, "_TOTAL_BUCKETS": 1, "_FILE": file_dict + }]) + with table.file_io.new_output_stream(str(manifest_path)) as out: + out.write(buffer.getvalue()) + + entries = manifest_manager.read(manifest_file_name, drop_stats=False) + self.assertEqual(len(entries), 1) + self.assertIsNone(entries[0].file.value_stats_cols) # Should be None when field is missing + def test_primary_key_value_stats(self): pa_schema = pa.schema([ ('id', pa.int64()),