diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java index 8fee740fd1cd..7053c620afe8 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -342,6 +342,7 @@ public boolean equals(Object o) { } TableSchema tableSchema = (TableSchema) o; return version == tableSchema.version + && highestFieldId == tableSchema.highestFieldId && Objects.equals(fields, tableSchema.fields) && Objects.equals(partitionKeys, tableSchema.partitionKeys) && Objects.equals(primaryKeys, tableSchema.primaryKeys) @@ -356,6 +357,16 @@ public int hashCode() { version, fields, partitionKeys, primaryKeys, options, comment, timeMillis); } + /** Checks if two schemas have the same content, ignoring version and timeMillis. */ + public boolean sameContent(TableSchema other) { + return Objects.equals(fields, other.fields) + && highestFieldId == other.highestFieldId + && Objects.equals(partitionKeys, other.partitionKeys) + && Objects.equals(primaryKeys, other.primaryKeys) + && Objects.equals(options, other.options) + && Objects.equals(comment, other.comment); + } + public static List newFields(RowType rowType) { return rowType.getFields(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 670960889d5c..d081872dc81a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -60,6 +60,9 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.apache.paimon.shade.guava30.com.google.common.collect.Streams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -110,7 +113,8 @@ @ThreadSafe public class SchemaManager implements Serializable { - public static final String SCHEMA_PREFIX = "schema-"; + private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); + private static final String SCHEMA_PREFIX = "schema-"; private final FileIO fileIO; private final Path tableRoot; @@ -266,8 +270,15 @@ public TableSchema commitChanges(List changes) tableRoot.toString(), true, branch))); LazyField lazyIdentifier = new LazyField<>(() -> identifierFromPath(tableRoot.toString(), true, branch)); - TableSchema newTableSchema = + Optional newTableSchemaOpt = generateTableSchema(oldTableSchema, changes, hasSnapshots, lazyIdentifier); + if (!newTableSchemaOpt.isPresent()) { + LOG.info( + "No schema change detected for table {}. Skipping schema update.", + lazyIdentifier.get()); + return oldTableSchema; + } + TableSchema newTableSchema = newTableSchemaOpt.get(); try { boolean success = commit(newTableSchema); if (success) { @@ -279,7 +290,7 @@ public TableSchema commitChanges(List changes) } } - public static TableSchema generateTableSchema( + public static Optional generateTableSchema( TableSchema oldTableSchema, List changes, LazyField hasSnapshots, @@ -314,7 +325,98 @@ public static TableSchema generateTableSchema( AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); String newComment = oldTableSchema.comment(); List newPrimaryKeys = oldTableSchema.primaryKeys(); + + // Filter out ineffective changes + List effectiveChanges = new ArrayList<>(); for (SchemaChange change : changes) { + if (change instanceof SetOption) { + SetOption setOption = (SetOption) change; + String oldValue = oldOptions.get(setOption.key()); + if (oldValue == null || !oldValue.equals(setOption.value())) { + effectiveChanges.add(change); + } + } else if (change instanceof RemoveOption) { + RemoveOption removeOption = (RemoveOption) change; + if (oldOptions.containsKey(removeOption.key())) { + effectiveChanges.add(change); + } + } else if (change instanceof UpdateComment) { + UpdateComment updateComment = (UpdateComment) change; + if (!Objects.equals(oldTableSchema.comment(), updateComment.comment())) { + effectiveChanges.add(change); + } + } else if (change instanceof RenameColumn) { + RenameColumn rename = (RenameColumn) change; + DataField field = findField(oldTableSchema.fields(), rename.fieldNames()); + if (field != null && !field.name().equals(rename.newName())) { + effectiveChanges.add(change); + } + } else if (change instanceof UpdateColumnType) { + UpdateColumnType update = (UpdateColumnType) change; + DataField field = findField(oldTableSchema.fields(), update.fieldNames()); + if (field != null) { + DataType oldType = field.type(); + DataType newType = update.newDataType(); + if (update.keepNullability()) { + newType = newType.copy(oldType.isNullable()); + } + if (!oldType.equals(newType)) { + effectiveChanges.add(change); + } + } + } else if (change instanceof UpdateColumnNullability) { + UpdateColumnNullability update = (UpdateColumnNullability) change; + DataField field = findField(oldTableSchema.fields(), update.fieldNames()); + if (field != null) { + DataType oldType = field.type(); + DataType sourceRootType = + getRootType( + oldType, + update.fieldNames().length - 1, + update.fieldNames().length); + if (sourceRootType.isNullable() != update.newNullability()) { + effectiveChanges.add(change); + } + } + } else if (change instanceof UpdateColumnComment) { + UpdateColumnComment update = (UpdateColumnComment) change; + DataField field = findField(oldTableSchema.fields(), update.fieldNames()); + if (field != null + && !Objects.equals(field.description(), update.newDescription())) { + effectiveChanges.add(change); + } + } else if (change instanceof UpdateColumnPosition) { + UpdateColumnPosition update = (UpdateColumnPosition) change; + SchemaChange.Move move = update.move(); + String fieldName = move.fieldName(); + DataField field = findFieldByName(newFields, fieldName); + if (field != null) { + int currentIndex = -1; + for (int i = 0; i < newFields.size(); i++) { + if (newFields.get(i).name().equals(fieldName)) { + currentIndex = i; + break; + } + } + int newIndex = calculateNewPosition(newFields, move); + if (currentIndex != newIndex) { + effectiveChanges.add(change); + } + } + } else if (change instanceof UpdateColumnDefaultValue) { + UpdateColumnDefaultValue update = (UpdateColumnDefaultValue) change; + DataField field = findField(oldTableSchema.fields(), update.fieldNames()); + if (field != null + && !Objects.equals(field.defaultValue(), update.newDefaultValue())) { + effectiveChanges.add(change); + } + } else { + // AddColumn and DropColumn always change the schema + effectiveChanges.add(change); + } + } + + for (SchemaChange change : effectiveChanges) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; if (hasSnapshots.get()) { @@ -571,18 +673,76 @@ protected void updateLastColumn( newFields, oldTableSchema.partitionKeys(), applyNotNestedColumnRename( - newPrimaryKeys, Iterables.filter(changes, RenameColumn.class)), - applyRenameColumnsToOptions(newOptions, changes), + newPrimaryKeys, + Iterables.filter(effectiveChanges, RenameColumn.class)), + applyRenameColumnsToOptions(newOptions, effectiveChanges), newComment); - return new TableSchema( - oldTableSchema.id() + 1, - newSchema.fields(), - highestFieldId.get(), - newSchema.partitionKeys(), - newSchema.primaryKeys(), - newSchema.options(), - newSchema.comment()); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); + + if (oldTableSchema.sameContent(newTableSchema)) { + return Optional.empty(); + } + return Optional.of(newTableSchema); + } + + private static DataField findField(List fields, String[] fieldNames) { + if (fieldNames.length == 0) { + return null; + } + String firstName = fieldNames[0]; + for (DataField field : fields) { + if (field.name().equals(firstName)) { + if (fieldNames.length == 1) { + return field; + } + // Handle nested fields + if (field.type() instanceof RowType) { + return findField( + ((RowType) field.type()).getFields(), + Arrays.copyOfRange(fieldNames, 1, fieldNames.length)); + } + } + } + return null; + } + + private static DataField findFieldByName(List fields, String fieldName) { + for (DataField field : fields) { + if (field.name().equals(fieldName)) { + return field; + } + } + return null; + } + + private static int calculateNewPosition(List fields, SchemaChange.Move move) { + if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) { + return 0; + } else if (move.type().equals(SchemaChange.Move.MoveType.LAST)) { + return fields.size() - 1; + } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) { + for (int i = 0; i < fields.size(); i++) { + if (fields.get(i).name().equals(move.referenceFieldName())) { + return i + 1; + } + } + } else if (move.type().equals(SchemaChange.Move.MoveType.BEFORE)) { + for (int i = 0; i < fields.size(); i++) { + if (fields.get(i).name().equals(move.referenceFieldName())) { + return i; + } + } + } + return -1; } // gets the rootType at the defined depth diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 5961101c3a55..a6c50c835665 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -2455,10 +2455,12 @@ protected void alterTableImpl(Identifier identifier, List changes) if (isFormatTable(schema.toSchema())) { TableSchema newSchema = SchemaManager.generateTableSchema( - schema, - changes, - new LazyField<>(() -> false), - new LazyField<>(() -> identifier)); + schema, + changes, + new LazyField<>(() -> false), + new LazyField<>(() -> identifier)) + .orElse(schema); + TableMetadata newTableMetadata = createTableMetadata( identifier, diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9a15cab36159..b456cd8961ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -939,4 +939,36 @@ public void testRollbackSchemaNotExist() throws Exception { new ChangelogManager(LocalFileIO.create(), path, null))) .hasMessageContaining("Schema 999 does not exist"); } + + @Test + public void testNoChangeCommitDoesNotCreateNewSchema() throws Exception { + // Create table with an initial option foo=bar + Map initialOptions = new HashMap<>(); + initialOptions.put("foo", "bar"); + Schema schemaWithOption = + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + initialOptions, + ""); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), path); + manager.createTable(schemaWithOption); + + long initialSchemaId = manager.latest().get().id(); + assertThat(manager.latest().get().options()).containsEntry("foo", "bar"); + + // Set option foo=bar again (no actual change) + manager.commitChanges(SchemaChange.setOption("foo", "bar")); + + // Verify no new schema is created when value didn't change + long newSchemaId = manager.latest().get().id(); + assertThat(newSchemaId).isEqualTo(initialSchemaId); + assertThat(manager.latest().get().options()).containsEntry("foo", "bar"); + + // Also test with UpdateComment when comment is unchanged + String initialComment = manager.latest().get().comment(); + manager.commitChanges(SchemaChange.updateComment(initialComment)); + assertThat(manager.latest().get().id()).isEqualTo(initialSchemaId); + } }