From cdf523f9eca2790e3daaf5e47292ac9f9f1c4cc8 Mon Sep 17 00:00:00 2001 From: zt18014 Date: Mon, 30 Mar 2026 19:05:42 +0800 Subject: [PATCH] [FLINK-39354] Fix SchemaUtils.applyAddColumnEvent fails when referenced existing column name differs only by case --- .../flink/cdc/common/utils/SchemaUtils.java | 24 ++++++++++++++----- .../cdc/common/utils/SchemaUtilsTest.java | 23 ++++++++++++++++++ 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 8ef9cd298be..8ea047bf92a 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -138,9 +138,8 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema Preconditions.checkNotNull( columnWithPosition.getExistedColumnName(), "existedColumnName could not be null in BEFORE type AddColumnEvent"); - List columnNames = - columns.stream().map(Column::getName).collect(Collectors.toList()); - int index = columnNames.indexOf(columnWithPosition.getExistedColumnName()); + int index = + findColumnIndex(columns, columnWithPosition.getExistedColumnName()); if (index < 0) { throw new IllegalArgumentException( String.format( @@ -156,9 +155,8 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema Preconditions.checkNotNull( columnWithPosition.getExistedColumnName(), "existedColumnName could not be null in AFTER type AddColumnEvent"); - List columnNames = - columns.stream().map(Column::getName).collect(Collectors.toList()); - int index = columnNames.indexOf(columnWithPosition.getExistedColumnName()); + int index = + findColumnIndex(columns, columnWithPosition.getExistedColumnName()); if (index < 0) { throw new IllegalArgumentException( String.format( @@ -174,6 +172,20 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema return oldSchema.copy(columns); } + private static int findColumnIndex(List columns, String existedColumnName) { + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(existedColumnName)) { + return i; + } + } + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equalsIgnoreCase(existedColumnName)) { + return i; + } + } + return -1; + } + private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) { List columns = oldSchema.getColumns().stream() diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index a5f3f86adda..bd3a0a4abb4 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -204,6 +204,29 @@ void testApplyColumnSchemaChangeEvent() { .build()); } + @Test + void testApplyAddColumnEventWithCaseInsensitiveExistingColumnName() { + TableId tableId = TableId.parse("gc_fps_receivable.billstat"); + Schema schema = + Schema.newBuilder().physicalColumn("changetype", DataTypes.STRING()).build(); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("origintypecode", DataTypes.STRING()), + AddColumnEvent.ColumnPosition.AFTER, + "ChangeType")); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + + schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); + Assertions.assertThat(schema) + .isEqualTo( + Schema.newBuilder() + .physicalColumn("changetype", DataTypes.STRING()) + .physicalColumn("origintypecode", DataTypes.STRING()) + .build()); + } + @Test void testGetNumericPrecision() { Assertions.assertThat(SchemaUtils.getNumericPrecision(DataTypes.TINYINT())).isEqualTo(3);