Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,11 @@ void add(DataFileMeta file) {
}
}
if (!files.isEmpty()) {
checkArgument(
file.schemaId() == files.get(0).schemaId(),
"All files in this bunch should have the same schema id.");
if (!isBlobFile(file.fileName())) {
checkArgument(
file.schemaId() == files.get(0).schemaId(),
"All files in this bunch should have the same schema id.");
}
checkArgument(
file.writeCols().equals(files.get(0).writeCols()),
"All files in this bunch should have the same write columns.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -404,6 +405,7 @@ protected void updateLastColumn(
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
assertNotUpdatingPartitionKeys(oldTableSchema, rename.fieldNames(), "rename");
assertNotRenamingBlobColumn(newFields, rename.fieldNames());
new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
Expand Down Expand Up @@ -908,6 +910,19 @@ private static void assertNotUpdatingPrimaryKeys(
}
}

private static void assertNotRenamingBlobColumn(List<DataField> fields, String[] fieldNames) {
if (fieldNames.length > 1) {
return;
}
String fieldName = fieldNames[0];
for (DataField field : fields) {
if (field.name().equals(fieldName) && field.type().is(DataTypeRoot.BLOB)) {
throw new UnsupportedOperationException(
String.format("Cannot rename BLOB column: [%s]", fieldName));
}
}
}

private abstract static class NestedColumnModifier {

private final String[] updateFieldNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.append;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
Expand All @@ -33,10 +35,12 @@
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.system.RowTrackingTable;
import org.apache.paimon.types.DataField;
Expand All @@ -45,6 +49,7 @@
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.UriReader;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -512,6 +517,99 @@ public void testReadRowTrackingWithBlobProjection() throws Exception {
assertThat(projectedCount.get()).isEqualTo(1);
}

@Test
void testReadBlobAfterAlterTableAndCompaction() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.BLOB());
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
catalog.createTable(identifier(), schemaBuilder.build(), true);

// Step 1: write data with schemaId=0
commitDefault(writeDataDefault(100, 1));

// Step 2: ALTER TABLE SET an unrelated option -> schemaId becomes 1
catalog.alterTable(
identifier(), SchemaChange.setOption("snapshot.num-retained.min", "5"), false);

// Step 3: write more data with schemaId=1
commitDefault(writeDataDefault(100, 1));

// Step 4: compact blob table using DataEvolutionCompactCoordinator
FileStoreTable table = getTableDefault();
DataEvolutionCompactCoordinator coordinator =
new DataEvolutionCompactCoordinator(table, false, false);
List<DataEvolutionCompactTask> tasks = coordinator.plan();
assertThat(tasks.size()).isGreaterThan(0);
List<CommitMessage> compactMessages = new ArrayList<>();
for (DataEvolutionCompactTask task : tasks) {
compactMessages.add(task.doCompact(table, commitUser));
}
commitDefault(compactMessages);

// Step 5: read after compaction
readDefault(row -> assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes));
}

@Disabled("Reproduce: rename blob column causes read failure after compaction")
@Test
void testRenameBlobColumnReadFailure() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
schemaBuilder.column("f1", DataTypes.STRING());
schemaBuilder.column("f2", DataTypes.BLOB());
schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "100 MB");
schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
catalog.createTable(identifier(), schemaBuilder.build(), true);

// Step 1: write blob data — blob files record writeCols=["f2"]
commitDefault(writeDataDefault(100, 1));

// Step 2: rename blob column f2 -> f2_renamed
catalog.alterTable(identifier(), SchemaChange.renameColumn("f2", "f2_renamed"), false);

// Step 3: write more data — new blob files have writeCols=["f2_renamed"]
commitDefault(writeDataDefault(100, 1));

// Step 4: compact merges files into the same split
FileStoreTable table = getTableDefault();
DataEvolutionCompactCoordinator coordinator =
new DataEvolutionCompactCoordinator(table, false, false);
List<DataEvolutionCompactTask> tasks = coordinator.plan();
assertThat(tasks.size()).isGreaterThan(0);
List<CommitMessage> compactMessages = new ArrayList<>();
for (DataEvolutionCompactTask task : tasks) {
compactMessages.add(task.doCompact(table, commitUser));
}
commitDefault(compactMessages);

// Step 5: read fails —
assertThatThrownBy(() -> readDefault(row -> {}))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("All files in this bunch should have the same write columns");
}

@Test
void testRenameBlobColumnShouldFail() throws Exception {
createTableDefault();
commitDefault(writeDataDefault(10, 1));

assertThatThrownBy(
() ->
catalog.alterTable(
identifier(),
SchemaChange.renameColumn("f2", "f2_renamed"),
false))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Cannot rename BLOB column");
}

private void createExternalStorageTable() throws Exception {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,36 @@ private DataFileMeta createBlobFile(
fileName, firstRowId, rowCount, maxSequenceNumber, Arrays.asList("blob_col"));
}

/** Creates a blob file with a specified schemaId. */
private DataFileMeta createBlobFileWithSchema(
String fileName,
long firstRowId,
long rowCount,
long maxSequenceNumber,
long schemaId) {
return DataFileMeta.create(
fileName + ".blob",
rowCount,
rowCount,
DataFileMeta.EMPTY_MIN_KEY,
DataFileMeta.EMPTY_MAX_KEY,
SimpleStats.EMPTY_STATS,
SimpleStats.EMPTY_STATS,
0,
maxSequenceNumber,
schemaId,
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
Timestamp.fromEpochMillis(System.currentTimeMillis()),
rowCount,
null,
FileSource.APPEND,
null,
null,
firstRowId,
Arrays.asList("blob_col"));
}

/** Creates a blob file with custom write columns. */
private DataFileMeta createBlobFileWithCols(
String fileName,
Expand Down Expand Up @@ -327,6 +357,20 @@ private DataFileMeta createBlobFileWithCols(
writeCols);
}

@Test
void testAddBlobFilesWithDifferentSchemaId() {
DataFileMeta blobEntry1 = createBlobFileWithSchema("blob1", 0, 100, 1, 0L);
DataFileMeta blobEntry2 = createBlobFileWithSchema("blob2", 100, 200, 1, 1L);

blobBunch.add(blobEntry1);
assertThatCode(() -> blobBunch.add(blobEntry2)).doesNotThrowAnyException();

assertThat(blobBunch.files).hasSize(2);
assertThat(blobBunch.files.get(0).schemaId()).isEqualTo(0L);
assertThat(blobBunch.files.get(1).schemaId()).isEqualTo(1L);
assertThat(blobBunch.rowCount()).isEqualTo(300);
}

@Test
public void testRowIdPushDown() {
SpecialFieldBunch blobBunch = new SpecialFieldBunch(Long.MAX_VALUE, true);
Expand Down