diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java index dd8433b1b4c1..c0e70ff4f892 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java @@ -548,9 +548,11 @@ void add(DataFileMeta file) { } } if (!files.isEmpty()) { - checkArgument( - file.schemaId() == files.get(0).schemaId(), - "All files in this bunch should have the same schema id."); + if (!isBlobFile(file.fileName())) { + checkArgument( + file.schemaId() == files.get(0).schemaId(), + "All files in this bunch should have the same schema id."); + } checkArgument( file.writeCols().equals(files.get(0).writeCols()), "All files in this bunch should have the same write columns."); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 8667f2271d32..3aca4b762be5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -42,6 +42,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeCasts; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.MapType; import org.apache.paimon.types.ReassignFieldId; import org.apache.paimon.types.RowType; @@ -404,6 +405,7 @@ protected void updateLastColumn( } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; assertNotUpdatingPartitionKeys(oldTableSchema, rename.fieldNames(), "rename"); + assertNotRenamingBlobColumn(newFields, rename.fieldNames()); new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) { @Override protected void updateLastColumn( @@ -908,6 +910,19 @@ private static void assertNotUpdatingPrimaryKeys( } } + private static void assertNotRenamingBlobColumn(List fields, String[] fieldNames) { + if (fieldNames.length > 1) { + return; + } + String fieldName = fieldNames[0]; + for (DataField field : fields) { + if (field.name().equals(fieldName) && field.type().is(DataTypeRoot.BLOB)) { + throw new UnsupportedOperationException( + String.format("Cannot rename BLOB column: [%s]", fieldName)); + } + } + } + private abstract static class NestedColumnModifier { private final String[] updateFieldNames; diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java index ca640aa68dbf..bd1e006ec774 100644 --- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java +++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java @@ -791,6 +791,86 @@ public void testJavaWriteCompressedTextAppendTable() throws Exception { } } + @Test + @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true") + public void testBlobWriteAlterCompact() throws Exception { + Identifier identifier = identifier("blob_alter_compact_test"); + catalog.dropTable(identifier, true); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.STRING()) + .column("f2", DataTypes.BLOB()) + .option("target-file-size", "100 MB") + .option("row-tracking.enabled", "true") + .option("data-evolution.enabled", "true") + .option("compaction.min.file-num", "2") + .option("bucket", "-1") + .build(); + catalog.createTable(identifier, schema, false); + + byte[] blobBytes = new byte[1024]; + new java.util.Random(42).nextBytes(blobBytes); + + // Batch 1: write with schemaId=0 + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + StreamTableWrite write = + table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite(); + StreamTableCommit commit = table.newCommit(commitUser); + for (int i = 0; i < 100; i++) { + write.write( + GenericRow.of( + 1, + BinaryString.fromString("batch1"), + new org.apache.paimon.data.BlobData(blobBytes))); + } + commit.commit(0, write.prepareCommit(false, 0)); + + // ALTER TABLE SET -> schemaId becomes 1 + catalog.alterTable( + identifier, + org.apache.paimon.schema.SchemaChange.setOption("snapshot.num-retained.min", "5"), + false); + + // Batch 2: write with schemaId=1 + table = (FileStoreTable) catalog.getTable(identifier); + write = table.newStreamWriteBuilder().withCommitUser(commitUser).newWrite(); + commit = table.newCommit(commitUser); + for (int i = 0; i < 100; i++) { + write.write( + GenericRow.of( + 2, + BinaryString.fromString("batch2"), + new org.apache.paimon.data.BlobData(blobBytes))); + } + commit.commit(1, write.prepareCommit(false, 1)); + write.close(); + commit.close(); + + // Compact + table = (FileStoreTable) catalog.getTable(identifier); + org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator coordinator = + new org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator( + table, false, false); + List tasks = + coordinator.plan(); + assertThat(tasks.size()).isGreaterThan(0); + List compactMessages = new ArrayList<>(); + for (org.apache.paimon.append.dataevolution.DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + StreamTableCommit compactCommit = table.newCommit(commitUser); + compactCommit.commit(2, compactMessages); + compactCommit.close(); + + FileStoreTable readTable = (FileStoreTable) catalog.getTable(identifier); + List splits = new ArrayList<>(readTable.newSnapshotReader().read().dataSplits()); + TableRead read = readTable.newRead(); + RowType rowType = readTable.rowType(); + List res = getResult(read, splits, row -> internalRowToString(row, rowType)); + assertThat(res).hasSize(200); + } + // Helper method from TableTestBase protected Identifier identifier(String tableName) { return new Identifier(database, tableName); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 736ce904cf54..e3019485bb2d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -19,6 +19,8 @@ package org.apache.paimon.append; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator; +import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; @@ -33,10 +35,14 @@ import org.apache.paimon.operation.DataEvolutionSplitRead; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.system.RowTrackingTable; import org.apache.paimon.types.DataField; @@ -45,6 +51,7 @@ import org.apache.paimon.utils.Range; import org.apache.paimon.utils.UriReader; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -512,6 +519,238 @@ public void testReadRowTrackingWithBlobProjection() throws Exception { assertThat(projectedCount.get()).isEqualTo(1); } + @Test + void testReadBlobAfterAlterTableAndCompaction() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + + // Step 1: write data with schemaId=0 + commitDefault(writeDataDefault(100, 1)); + + // Step 2: ALTER TABLE SET an unrelated option -> schemaId becomes 1 + catalog.alterTable( + identifier(), SchemaChange.setOption("snapshot.num-retained.min", "5"), false); + + // Step 3: write more data with schemaId=1 + commitDefault(writeDataDefault(100, 1)); + + // Step 4: compact blob table using DataEvolutionCompactCoordinator + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List tasks = coordinator.plan(); + assertThat(tasks.size()).isGreaterThan(0); + List compactMessages = new ArrayList<>(); + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + commitDefault(compactMessages); + + // Step 5: read after compaction + readDefault(row -> assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes)); + } + + @Test + void testReadBlobAfterAddColumnAndCompaction() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + + { + FileStoreTable t = getTableDefault(); + StreamWriteBuilder b = t.newStreamWriteBuilder().withCommitUser(commitUser); + try (StreamTableWrite w = b.newWrite()) { + for (int j = 0; j < 100; j++) { + w.write( + GenericRow.of( + 1, BinaryString.fromString("batch1"), new BlobData(blobBytes))); + } + commitDefault(w.prepareCommit(false, Long.MAX_VALUE)); + } + } + + catalog.alterTable(identifier(), SchemaChange.addColumn("f3", DataTypes.STRING()), false); + + { + FileStoreTable t = getTableDefault(); + StreamWriteBuilder b = t.newStreamWriteBuilder().withCommitUser(commitUser); + try (StreamTableWrite w = b.newWrite()) { + for (int j = 0; j < 100; j++) { + w.write( + GenericRow.of( + 2, + BinaryString.fromString("batch2"), + new BlobData(blobBytes), + BinaryString.fromString("after-add"))); + } + commitDefault(w.prepareCommit(false, Long.MAX_VALUE)); + } + } + + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List tasks = coordinator.plan(); + assertThat(tasks.size()).isGreaterThan(0); + List compactMessages = new ArrayList<>(); + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + commitDefault(compactMessages); + + AtomicInteger batch1Count = new AtomicInteger(0); + AtomicInteger batch2Count = new AtomicInteger(0); + readDefault( + row -> { + assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); + if (row.getInt(0) == 1) { + batch1Count.incrementAndGet(); + } else if (row.getInt(0) == 2) { + batch2Count.incrementAndGet(); + } + }); + assertThat(batch1Count.get()).isEqualTo(100); + assertThat(batch2Count.get()).isEqualTo(100); + } + + @Test + void testReadBlobAfterDropColumnAndCompaction() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.column("f3", DataTypes.STRING()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + + { + FileStoreTable t = getTableDefault(); + StreamWriteBuilder b = t.newStreamWriteBuilder().withCommitUser(commitUser); + try (StreamTableWrite w = b.newWrite()) { + for (int j = 0; j < 100; j++) { + w.write( + GenericRow.of( + 1, + BinaryString.fromString("batch1"), + new BlobData(blobBytes), + BinaryString.fromString("before-drop"))); + } + commitDefault(w.prepareCommit(false, Long.MAX_VALUE)); + } + } + + catalog.alterTable(identifier(), SchemaChange.dropColumn("f3"), false); + + { + FileStoreTable t = getTableDefault(); + StreamWriteBuilder b = t.newStreamWriteBuilder().withCommitUser(commitUser); + try (StreamTableWrite w = b.newWrite()) { + for (int j = 0; j < 100; j++) { + w.write( + GenericRow.of( + 2, BinaryString.fromString("batch2"), new BlobData(blobBytes))); + } + commitDefault(w.prepareCommit(false, Long.MAX_VALUE)); + } + } + + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List tasks = coordinator.plan(); + assertThat(tasks.size()).isGreaterThan(0); + List compactMessages = new ArrayList<>(); + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + commitDefault(compactMessages); + + AtomicInteger batch1Count = new AtomicInteger(0); + AtomicInteger batch2Count = new AtomicInteger(0); + readDefault( + row -> { + assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); + if (row.getInt(0) == 1) { + batch1Count.incrementAndGet(); + } else if (row.getInt(0) == 2) { + batch2Count.incrementAndGet(); + } + }); + assertThat(batch1Count.get()).isEqualTo(100); + assertThat(batch2Count.get()).isEqualTo(100); + } + + @Disabled("Reproduce: rename blob column causes read failure after compaction") + @Test + void testRenameBlobColumnReadFailure() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + + // Step 1: write blob data — blob files record writeCols=["f2"] + commitDefault(writeDataDefault(100, 1)); + + // Step 2: rename blob column f2 -> f2_renamed + catalog.alterTable(identifier(), SchemaChange.renameColumn("f2", "f2_renamed"), false); + + // Step 3: write more data — new blob files have writeCols=["f2_renamed"] + commitDefault(writeDataDefault(100, 1)); + + // Step 4: compact merges files into the same split + FileStoreTable table = getTableDefault(); + DataEvolutionCompactCoordinator coordinator = + new DataEvolutionCompactCoordinator(table, false, false); + List tasks = coordinator.plan(); + assertThat(tasks.size()).isGreaterThan(0); + List compactMessages = new ArrayList<>(); + for (DataEvolutionCompactTask task : tasks) { + compactMessages.add(task.doCompact(table, commitUser)); + } + commitDefault(compactMessages); + + // Step 5: read fails — + assertThatThrownBy(() -> readDefault(row -> {})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("All files in this bunch should have the same write columns"); + } + + @Test + void testRenameBlobColumnShouldFail() throws Exception { + createTableDefault(); + commitDefault(writeDataDefault(10, 1)); + + assertThatThrownBy( + () -> + catalog.alterTable( + identifier(), + SchemaChange.renameColumn("f2", "f2_renamed"), + false)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Cannot rename BLOB column"); + } + private void createExternalStorageTable() throws Exception { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java index 4e85b1e3dd54..9bb9a7274ed1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java @@ -297,6 +297,36 @@ private DataFileMeta createBlobFile( fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("blob_col")); } + /** Creates a blob file with a specified schemaId. */ + private DataFileMeta createBlobFileWithSchema( + String fileName, + long firstRowId, + long rowCount, + long maxSequenceNumber, + long schemaId) { + return DataFileMeta.create( + fileName + ".blob", + rowCount, + rowCount, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + SimpleStats.EMPTY_STATS, + SimpleStats.EMPTY_STATS, + 0, + maxSequenceNumber, + schemaId, + DataFileMeta.DUMMY_LEVEL, + Collections.emptyList(), + Timestamp.fromEpochMillis(System.currentTimeMillis()), + rowCount, + null, + FileSource.APPEND, + null, + null, + firstRowId, + Arrays.asList("blob_col")); + } + /** Creates a blob file with custom write columns. */ private DataFileMeta createBlobFileWithCols( String fileName, @@ -327,6 +357,20 @@ private DataFileMeta createBlobFileWithCols( writeCols); } + @Test + void testAddBlobFilesWithDifferentSchemaId() { + DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L); + DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L); + + blobBunch.add(blobEntry1); + assertThatCode(() -> blobBunch.add(blobEntry2)).doesNotThrowAnyException(); + + assertThat(blobBunch.files).hasSize(2); + assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L); + assertThat(blobBunch.files.get(1).schemaId()).isEqualTo(1L); + assertThat(blobBunch.rowCount()).isEqualTo(300); + } + @Test public void testRowIdPushDown() { SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true); diff --git a/paimon-python/dev/run_mixed_tests.sh b/paimon-python/dev/run_mixed_tests.sh index f98bf32747b0..3366a5ff1c2d 100755 --- a/paimon-python/dev/run_mixed_tests.sh +++ b/paimon-python/dev/run_mixed_tests.sh @@ -290,6 +290,29 @@ run_lumina_vector_test() { fi } +run_blob_alter_compact_test() { + echo -e "${YELLOW}=== Running Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read) ===${NC}" + + cd "$PROJECT_ROOT" + + echo "Running Maven test for JavaPyE2ETest.testBlobWriteAlterCompact..." + if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testBlobWriteAlterCompact -pl paimon-core -q -Drun.e2e.tests=true; then + echo -e "${GREEN}✓ Java blob write+alter+compact test completed successfully${NC}" + else + echo -e "${RED}✗ Java blob write+alter+compact test failed${NC}" + return 1 + fi + cd "$PAIMON_PYTHON_DIR" + echo "Running Python test for JavaPyReadWriteTest.test_read_blob_after_alter_and_compact..." + if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_read_blob_after_alter_and_compact -v; then + echo -e "${GREEN}✓ Python blob read test completed successfully${NC}" + return 0 + else + echo -e "${RED}✗ Python blob read test failed${NC}" + return 1 + fi +} + # Main execution main() { local java_write_result=0 @@ -301,6 +324,7 @@ main() { local compressed_text_result=0 local tantivy_fulltext_result=0 local lumina_vector_result=0 + local blob_alter_compact_result=0 # Detect Python version PYTHON_VERSION=$(python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || echo "unknown") @@ -383,6 +407,13 @@ main() { echo "" + # Run blob alter+compact test (Java write+alter+compact, Python read) + if ! run_blob_alter_compact_test; then + blob_alter_compact_result=1 + fi + + echo "" + echo -e "${YELLOW}=== Test Results Summary ===${NC}" if [[ $java_write_result -eq 0 ]]; then @@ -439,12 +470,18 @@ main() { echo -e "${RED}✗ Lumina Vector Index Test (Java Write, Python Read): FAILED${NC}" fi + if [[ $blob_alter_compact_result -eq 0 ]]; then + echo -e "${GREEN}✓ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): PASSED${NC}" + else + echo -e "${RED}✗ Blob Alter+Compact Test (Java Write+Alter+Compact, Python Read): FAILED${NC}" + fi + echo "" # Clean up warehouse directory after all tests cleanup_warehouse - if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 ]]; then + if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $blob_alter_compact_result -eq 0 ]]; then echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}" return 0 else diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py b/paimon-python/pypaimon/read/reader/field_bunch.py index fccf2cb3250f..f9f2dca3cec9 100644 --- a/paimon-python/pypaimon/read/reader/field_bunch.py +++ b/paimon-python/pypaimon/read/reader/field_bunch.py @@ -99,10 +99,11 @@ def add(self, file: DataFileMeta) -> None: ) if self._files: - if file.schema_id != self._files[0].schema_id: - raise ValueError( - "All files in a blob bunch should have the same schema id." - ) + if not DataFileMeta.is_blob_file(file.file_name): + if file.schema_id != self._files[0].schema_id: + raise ValueError( + "All files in a blob bunch should have the same schema id." + ) if file.write_cols != self._files[0].write_cols: raise ValueError( "All files in a blob bunch should have the same write columns." diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index 0a18c77630ad..721925dc3312 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -136,6 +136,18 @@ def _assert_not_updating_primary_keys( raise ValueError(f"Cannot {operation} primary key") +def _assert_not_renaming_blob_column( + new_fields: List[DataField], field_names: List[str]): + if len(field_names) > 1: + return + field_name = field_names[0] + for field in new_fields: + if field.name == field_name and str(field.type) == 'BLOB': + raise ValueError( + f"Cannot rename BLOB column: [{field_name}]" + ) + + def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]): field_name = change.field_names[-1] new_name = change.new_name @@ -355,6 +367,7 @@ def _generate_table_schema( _assert_not_updating_partition_keys( old_table_schema, change.field_names, "rename" ) + _assert_not_renaming_blob_column(new_fields, change.field_names) _handle_rename_column(change, new_fields) elif isinstance(change, DropColumn): _drop_column_validation(old_table_schema, change) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 0d3bf49cbdb6..7e4d6c6d26a1 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -24,6 +24,7 @@ import pyarrow as pa from pypaimon import CatalogFactory, Schema +from pypaimon.schema.schema_change import SchemaChange from pypaimon.table.file_store_table import FileStoreTable from pypaimon.write.commit_message import CommitMessage @@ -3024,6 +3025,30 @@ def test_blob_with_row_id_equal(self): result = read.to_arrow(splits) self.assertEqual(result.num_rows, 1) + def test_rename_blob_column_should_fail(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('blob_col', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + } + ) + self.catalog.create_table('test_db.blob_rename_test', schema, False) + + with self.assertRaises(RuntimeError) as ctx: + self.catalog.alter_table( + 'test_db.blob_rename_test', + [SchemaChange.rename_column('blob_col', 'blob_col_renamed')], + False + ) + self.assertIn('Cannot rename BLOB column', str(ctx.exception)) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 5cc7d549254a..4a61b9a06780 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -602,3 +602,12 @@ def test_read_lumina_vector_index(self): ids = pa_table.column('id').to_pylist() print(f"Lumina vector search matched rows: ids={ids}") self.assertIn(0, ids) + + def test_read_blob_after_alter_and_compact(self): + table = self.catalog.get_table('default.blob_alter_compact_test') + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + result = table_read.to_arrow(splits) + self.assertEqual(result.num_rows, 200)