diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md index 85b14141bef..2d7f3057b62 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md @@ -441,6 +441,19 @@ Flink SQL> SELECT * FROM orders; 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 + + + + scan.incremental.snapshot.string-key.compare-mode + optional + default + String + + 增量快照阶段字符串类型 chunk key 的比较模式。可选值包括: +
  • default:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 utf8mb4_general_ci 等大小写不敏感排序规则不一致。
  • +
  • case-insensitive:使用大小写不敏感比较(Java 中的 compareToIgnoreCase),适用于 MySQL 使用 utf8mb4_general_ci 等大小写不敏感排序规则的场景。
  • +
  • binary:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 utf8mb4_binBINARY 关键字等二进制比较的场景。
  • + use.legacy.json.format diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md index 04ec2844551..4e17966d204 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/mysql.md @@ -340,6 +340,19 @@ pipeline: 如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。
    警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。 + + + + scan.incremental.snapshot.string-key.compare-mode + optional + default + String + + 增量快照阶段字符串类型 chunk key 的比较模式。可选值包括: +
  • default:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 utf8mb4_general_ci 等大小写不敏感排序规则不一致。
  • +
  • case-insensitive:使用大小写不敏感比较(Java 中的 compareToIgnoreCase),适用于 MySQL 使用 utf8mb4_general_ci 等大小写不敏感排序规则的场景。
  • +
  • binary:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 utf8mb4_binBINARY 关键字等二进制比较的场景。
  • + metadata.list diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 2d47d9b7d71..e0e17b0ca8b 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -468,6 +468,18 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + scan.incremental.snapshot.string-key.compare-mode + optional + default + String + + The compare mode for string type chunk key during incremental snapshot phase. Available values are: +
  • default: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like utf8mb4_general_ci.
  • +
  • case-insensitive: Use case-insensitive comparison (compareToIgnoreCase in Java), suitable when MySQL uses case-insensitive collation such as utf8mb4_general_ci.
  • +
  • binary: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like utf8mb4_bin or BINARY keyword.
  • + + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index e6e61e20720..6d022342f03 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -362,6 +362,18 @@ pipeline: For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + scan.incremental.snapshot.string-key.compare-mode + optional + default + String + + The compare mode for string type chunk key during incremental snapshot phase. Available values are: +
  • default: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like utf8mb4_general_ci.
  • +
  • case-insensitive: Use case-insensitive comparison (compareToIgnoreCase in Java), suitable when MySQL uses case-insensitive collation such as utf8mb4_general_ci.
  • +
  • binary: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like utf8mb4_bin or BINARY keyword.
  • + + metadata.list optional diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 1b3540da0bb..0ba75bc692e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -80,6 +81,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; @@ -167,6 +169,9 @@ public DataSource createDataSource(Context context) { boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT); boolean isAssignUnboundedChunkFirst = config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); + ChunkKeyCompareMode chunkKeyCompareMode = + ChunkKeyCompareMode.fromValue( + config.get(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE)); validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); @@ -220,6 +225,7 @@ public DataSource createDataSource(Context context) { .treatTinyInt1AsBoolean(treatTinyInt1AsBoolean) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) + .chunkKeyCompareMode(chunkKeyCompareMode) .skipSnapshotBackfill(skipSnapshotBackfill); List tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); @@ -358,6 +364,7 @@ public Set> optionalOptions() { options.add(PARSE_ONLINE_SCHEMA_CHANGES); options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..b34233bbb18 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -224,6 +224,19 @@ public class MySqlDataSourceOptions { + "By default, the chunk key is the first column of the primary key." + "eg. db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;"); + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE = + ConfigOptions.key("scan.incremental.snapshot.string-key.compare-mode") + .stringType() + .defaultValue("default") + .withDescription( + "The compare mode for string chunk key during incremental snapshot. Supported values are: " + + "'default' (uses Java String.compareTo), " + + "'case-insensitive' (uses Java String.compareToIgnoreCase to align with MySQL case-insensitive collations), " + + "'binary' (forces binary comparison in both MySQL SQL and Java). " + + "The 'case-insensitive' mode is recommended for tables using utf8mb4_general_ci or similar case-insensitive collations. " + + "The 'binary' mode ensures consistent byte-level ordering but may impact query performance during chunk splitting."); + @Experimental public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = ConfigOptions.key("scan.incremental.close-idle-reader.enabled") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index b1e6d1dfc86..4e5054d5e6a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask; import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; @@ -93,6 +94,7 @@ public class BinlogSplitReader implements DebeziumReader> pendingSchemaChangeEvents; private static final long READER_CLOSE_TIMEOUT = 30L; @@ -116,6 +118,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) this.isParsingOnLineSchemaChanges = statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges(); this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill(); + this.chunkKeyCompareMode = statefulTaskContext.getSourceConfig().getChunkKeyCompareMode(); this.pendingSchemaChangeEvents = new HashMap<>(); } @@ -316,7 +319,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) { FinishedSnapshotSplitInfo matchedSplit = SplitKeyUtils.findSplitByKeyBinary( - finishedSplitsInfo.get(tableId), chunkKey); + finishedSplitsInfo.get(tableId), chunkKey, chunkKeyCompareMode); return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark()); } @@ -381,7 +384,12 @@ private void configureFilter() { } // Sort splits by splitStart for binary search optimization // Binary search requires sorted data to work correctly - splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos); + splitsInfoMap + .values() + .forEach( + splits -> + SplitKeyUtils.sortFinishedSplitInfos( + splits, chunkKeyCompareMode)); } this.finishedSplitsInfo = splitsInfoMap; this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index e3549237065..4f696b0dc92 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -360,7 +360,8 @@ public Iterator pollWithBuffer() throws InterruptedException { currentSnapshotSplit.getSplitKeyType(), nameAdjuster, currentSnapshotSplit.getSplitStart(), - currentSnapshotSplit.getSplitEnd()); + currentSnapshotSplit.getSplitEnd(), + statefulTaskContext.getSourceConfig().getChunkKeyCompareMode()); } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..11da47c5ec8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -247,7 +247,8 @@ private void createDataEventsForTable( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, - snapshotSplit.getSplitEnd() == null); + snapshotSplit.getSplitEnd() == null, + sourceConfig.getChunkKeyCompareMode()); LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 93fa2a0d36a..55ca97845a5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.source; import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; @@ -312,6 +313,12 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh return this; } + /** The compare mode for string chunk key during incremental snapshot. Defaults to 'default'. */ + public MySqlSourceBuilder chunkKeyCompareMode(ChunkKeyCompareMode chunkKeyCompareMode) { + this.configFactory.chunkKeyCompareMode(chunkKeyCompareMode); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index b7efb9ab8ec..123b228c8f7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -152,7 +152,11 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { ChunkUtils.getChunkKeyColumnType( splitColumn, sourceConfig.isTreatTinyInt1AsBoolean()); minMaxOfSplitColumn = - StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); + StatementUtils.queryMinMax( + jdbcConnection, + tableId, + splitColumn.name(), + sourceConfig.getChunkKeyCompareMode()); approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); } catch (Exception e) { throw new RuntimeException("Fail to analyze table in chunk splitter.", e); @@ -184,7 +188,12 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, chunkSize); // may sleep a while to avoid DDOS on MySQL server maySleep(nextChunkId, tableId); - if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) { + if (chunkEnd != null + && ObjectUtils.compare( + chunkEnd, + minMaxOfSplitColumn[1], + sourceConfig.getChunkKeyCompareMode()) + <= 0) { nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd); return createSnapshotSplit( jdbcConnection, @@ -306,7 +315,7 @@ public List splitEvenlySizedChunks( final List splits = new ArrayList<>(); Object chunkStart = null; Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize); - while (ObjectUtils.compare(chunkEnd, max) <= 0) { + while (ObjectUtils.compare(chunkEnd, max, sourceConfig.getChunkKeyCompareMode()) <= 0) { splits.add(ChunkRange.of(chunkStart, chunkEnd)); chunkStart = chunkEnd; try { @@ -322,8 +331,7 @@ public List splitEvenlySizedChunks( return splits; } - @VisibleForTesting - Object nextChunkEnd( + private Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, @@ -334,14 +342,22 @@ Object nextChunkEnd( // chunk end might be null when max values are removed Object chunkEnd = StatementUtils.queryNextChunkMax( - jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); - if (chunkEnd == null) { - return null; - } + jdbc, + tableId, + splitColumnName, + chunkSize, + previousChunkEnd, + sourceConfig.getChunkKeyCompareMode()); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = + StatementUtils.queryMin( + jdbc, + tableId, + splitColumnName, + chunkEnd, + sourceConfig.getChunkKeyCompareMode()); // queryMin will return null when the chunkEnd is the max value, // this will happen when the mysql table ignores the capitalization. @@ -355,7 +371,7 @@ Object nextChunkEnd( return null; } } - if (ObjectUtils.compare(chunkEnd, max) >= 0) { + if (ObjectUtils.compare(chunkEnd, max, sourceConfig.getChunkKeyCompareMode()) >= 0) { return null; } else { return chunkEnd; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/ChunkKeyCompareMode.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/ChunkKeyCompareMode.java new file mode 100644 index 00000000000..f492e450557 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/ChunkKeyCompareMode.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.config; + +/** The compare mode for string chunk key during incremental snapshot. */ +public enum ChunkKeyCompareMode { + /** Use the default behavior, which relies on Java's natural String comparison. */ + DEFAULT("default"), + + /** + * Use case-insensitive comparison in Java ({@link String#compareToIgnoreCase}) to align with + * MySQL's case-insensitive collations (e.g., utf8mb4_general_ci). No SQL changes are needed. + */ + CASE_INSENSITIVE("case-insensitive"), + + /** + * Force binary (byte-level) comparison in both MySQL SQL queries and Java ({@link + * String#compareTo}). This makes the comparison semantics consistent regardless of the table's + * collation, but may impact index usage during chunk splitting. + */ + BINARY("binary"); + + private final String value; + + ChunkKeyCompareMode(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static ChunkKeyCompareMode fromValue(String value) { + for (ChunkKeyCompareMode mode : values()) { + if (mode.value.equalsIgnoreCase(value)) { + return mode; + } + } + return DEFAULT; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index cf456fcaed0..92f491cd738 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -72,10 +72,11 @@ public class MySqlSourceConfig implements Serializable { private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; + private final ChunkKeyCompareMode chunkKeyCompareMode; // -------------------------------------------------------------------------------------------- // Debezium Configurations - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------------------------- private final Properties dbzProperties; private final Configuration dbzConfiguration; private final MySqlConnectorConfig dbzMySqlConfig; @@ -112,7 +113,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + ChunkKeyCompareMode chunkKeyCompareMode) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -158,6 +160,7 @@ public class MySqlSourceConfig implements Serializable { this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.chunkKeyCompareMode = chunkKeyCompareMode; } public String getHostname() { @@ -257,6 +260,10 @@ public boolean isAssignUnboundedChunkFirst() { return assignUnboundedChunkFirst; } + public ChunkKeyCompareMode getChunkKeyCompareMode() { + return chunkKeyCompareMode; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 569b62232db..60cee37a3bd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -78,6 +78,10 @@ public class MySqlSourceConfigFactory implements Serializable { private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; private boolean assignUnboundedChunkFirst = false; + private ChunkKeyCompareMode chunkKeyCompareMode = + ChunkKeyCompareMode.fromValue( + MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE + .defaultValue()); public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -341,6 +345,12 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde return this; } + /** The compare mode for string chunk key during incremental snapshot. Defaults to 'default'. */ + public MySqlSourceConfigFactory chunkKeyCompareMode(ChunkKeyCompareMode chunkKeyCompareMode) { + this.chunkKeyCompareMode = chunkKeyCompareMode; + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -444,6 +454,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + chunkKeyCompareMode); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a8e143f5fc5..7cdb89be1c2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -252,6 +252,19 @@ public class MySqlSourceOptions { "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + "By default, the chunk key is the first column of the primary key."); + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE = + ConfigOptions.key("scan.incremental.snapshot.string-key.compare-mode") + .stringType() + .defaultValue("default") + .withDescription( + "The compare mode for string chunk key during incremental snapshot. Supported values are: " + + "'default' (uses Java String.compareTo), " + + "'case-insensitive' (uses Java String.compareToIgnoreCase to align with MySQL case-insensitive collations), " + + "'binary' (forces binary comparison in both MySQL SQL and Java). " + + "The 'case-insensitive' mode is recommended for tables using utf8mb4_general_ci or similar case-insensitive collations. " + + "The 'binary' mode ensures consistent byte-level ordering but may impact query performance during chunk splitting."); + @Experimental public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = ConfigOptions.key("scan.incremental.close-idle-reader.enabled") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java index a3a7281a857..093e193a2da 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ObjectUtils.java @@ -82,9 +82,31 @@ public static BigDecimal minus(Object minuend, Object subtrahend) { */ @SuppressWarnings("unchecked") public static int compare(Object obj1, Object obj2) { + return compare( + obj1, + obj2, + org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode.DEFAULT); + } + + @SuppressWarnings("unchecked") + public static int compare( + Object obj1, + Object obj2, + org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode compareMode) { if (obj1 instanceof Comparable && obj1.getClass().equals(obj2.getClass())) { + if (obj1 instanceof String + && compareMode + == org.apache.flink.cdc.connectors.mysql.source.config + .ChunkKeyCompareMode.CASE_INSENSITIVE) { + return String.CASE_INSENSITIVE_ORDER.compare((String) obj1, (String) obj2); + } return ((Comparable) obj1).compareTo(obj2); } else { + if (compareMode + == org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode + .CASE_INSENSITIVE) { + return obj1.toString().compareToIgnoreCase(obj2.toString()); + } return obj1.toString().compareTo(obj2.toString()); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java index 1f9a041d78f..fec062d4005 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind; import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; @@ -100,7 +101,8 @@ public static void upsertBinlog( RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Object[] splitStart, - Object[] splitEnd) { + Object[] splitEnd, + ChunkKeyCompareMode compareMode) { if (isDataChangeRecord(binlogRecord)) { Struct value = (Struct) binlogRecord.value(); if (value != null) { @@ -108,7 +110,8 @@ public static void upsertBinlog( if (SplitKeyUtils.splitKeyRangeContains( SplitKeyUtils.getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct), splitStart, - splitEnd)) { + splitEnd, + compareMode)) { boolean hasPrimaryKey = binlogRecord.key() != null; Envelope.Operation operation = Envelope.Operation.forCode( @@ -137,7 +140,8 @@ public static void upsertBinlog( SplitKeyUtils.getSplitKey( splitBoundaryType, nameAdjuster, structFromAfter), splitStart, - splitEnd)) { + splitEnd, + compareMode)) { LOG.warn( "The updated chunk key is out of the split range. Cannot provide exactly-once semantics."); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java index d19de5df573..27a07118bfd 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SplitKeyUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.table.types.logical.RowType; @@ -33,18 +34,24 @@ public class SplitKeyUtils { /** Returns the specific key contains in the split key range or not. */ public static boolean splitKeyRangeContains( Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) { - return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == RangePosition.WITHIN; + return splitKeyRangeContains(key, splitKeyStart, splitKeyEnd, ChunkKeyCompareMode.DEFAULT); } - @SuppressWarnings("unchecked") - private static int compareObjects(Object o1, Object o2) { - if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) { - return ((Comparable) o1).compareTo(o2); - } else if (isNumericObject(o1) && isNumericObject(o2)) { + /** Returns the specific key contains in the split key range or not. */ + public static boolean splitKeyRangeContains( + Object[] key, + Object[] splitKeyStart, + Object[] splitKeyEnd, + ChunkKeyCompareMode compareMode) { + return compareKeyWithRange(key, splitKeyStart, splitKeyEnd, compareMode) + == RangePosition.WITHIN; + } + + private static int compareObjects(Object o1, Object o2, ChunkKeyCompareMode compareMode) { + if (isNumericObject(o1) && isNumericObject(o2)) { return toBigDecimal(o1).compareTo(toBigDecimal(o2)); - } else { - return o1.toString().compareTo(o2.toString()); } + return ObjectUtils.compare(o1, o2, compareMode); } private static boolean isNumericObject(Object obj) { @@ -84,6 +91,26 @@ public static Object[] getSplitKey( * @param splits List of splits to be sorted (sorted in-place) */ public static void sortFinishedSplitInfos(List splits) { + sortFinishedSplitInfos(splits, ChunkKeyCompareMode.DEFAULT); + } + + /** + * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending order. This is + * required for binary search to work correctly. + * + *

    Handles special cases: - Splits with null splitStart are considered as MIN value (sorted + * to front) - Splits with null splitEnd are considered as MAX value (sorted to back) + * + *

    NOTE: Current implementation assumes single-field split keys (as indicated by + * getSplitKey()). If multi-field split keys are supported in the future, the comparison logic + * should be reviewed to ensure consistency with {@link + * #splitKeyRangeContains(Object[],Object[],Object[],ChunkKeyCompareMode)}. + * + * @param splits List of splits to be sorted (sorted in-place) + * @param compareMode The compare mode for string chunk key comparison + */ + public static void sortFinishedSplitInfos( + List splits, ChunkKeyCompareMode compareMode) { if (splits == null || splits.size() <= 1) { return; } @@ -105,7 +132,7 @@ public static void sortFinishedSplitInfos(List splits } // Compare split starts - return compareSplit(leftSplitStart, rightSplitStart); + return compareSplit(leftSplitStart, rightSplitStart, compareMode); }); } @@ -125,6 +152,28 @@ public static void sortFinishedSplitInfos(List splits */ public static FinishedSnapshotSplitInfo findSplitByKeyBinary( List sortedSplits, Object[] key) { + return findSplitByKeyBinary(sortedSplits, key, ChunkKeyCompareMode.DEFAULT); + } + + /** + * Uses binary search to find the split containing the specified key in a sorted split list. + * + *

    IMPORTANT: The splits list MUST be sorted by splitStart before calling this method. Use + * sortFinishedSplitInfos() to sort the list if needed. + * + *

    To leverage data locality for append-heavy workloads (e.g. auto-increment PKs), this + * method checks the first and last splits before applying binary search to the remaining + * subset. + * + * @param sortedSplits List of splits sorted by splitStart (MUST be sorted!) + * @param key The chunk key to search for + * @param compareMode The compare mode for string chunk key comparison + * @return The split containing the key, or null if not found + */ + public static FinishedSnapshotSplitInfo findSplitByKeyBinary( + List sortedSplits, + Object[] key, + ChunkKeyCompareMode compareMode) { if (sortedSplits == null || sortedSplits.isEmpty()) { return null; @@ -134,7 +183,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary( FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0); RangePosition firstPosition = - compareKeyWithRange(key, firstSplit.getSplitStart(), firstSplit.getSplitEnd()); + compareKeyWithRange( + key, firstSplit.getSplitStart(), firstSplit.getSplitEnd(), compareMode); if (firstPosition == RangePosition.WITHIN) { return firstSplit; } @@ -147,7 +197,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary( FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1); RangePosition lastPosition = - compareKeyWithRange(key, lastSplit.getSplitStart(), lastSplit.getSplitEnd()); + compareKeyWithRange( + key, lastSplit.getSplitStart(), lastSplit.getSplitEnd(), compareMode); if (lastPosition == RangePosition.WITHIN) { return lastSplit; } @@ -166,7 +217,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary( FinishedSnapshotSplitInfo split = sortedSplits.get(mid); RangePosition position = - compareKeyWithRange(key, split.getSplitStart(), split.getSplitEnd()); + compareKeyWithRange( + key, split.getSplitStart(), split.getSplitEnd(), compareMode); if (position == RangePosition.WITHIN) { return split; @@ -192,29 +244,29 @@ private enum RangePosition { * returns where the key lies relative to that interval. */ private static RangePosition compareKeyWithRange( - Object[] key, Object[] splitStart, Object[] splitEnd) { + Object[] key, Object[] splitStart, Object[] splitEnd, ChunkKeyCompareMode compareMode) { if (splitStart == null) { if (splitEnd == null) { return RangePosition.WITHIN; // Full range split } // key < splitEnd ? - int cmp = compareSplit(key, splitEnd); + int cmp = compareSplit(key, splitEnd, compareMode); return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER; } if (splitEnd == null) { // key >= splitStart ? - int cmp = compareSplit(key, splitStart); + int cmp = compareSplit(key, splitStart, compareMode); return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE; } // Normal case: [splitStart, splitEnd) - int cmpStart = compareSplit(key, splitStart); + int cmpStart = compareSplit(key, splitStart, compareMode); if (cmpStart < 0) { return RangePosition.BEFORE; // key < splitStart } - int cmpEnd = compareSplit(key, splitEnd); + int cmpEnd = compareSplit(key, splitEnd, compareMode); if (cmpEnd >= 0) { return RangePosition.AFTER; // key >= splitEnd } @@ -222,7 +274,8 @@ private static RangePosition compareKeyWithRange( return RangePosition.WITHIN; // splitStart <= key < splitEnd } - private static int compareSplit(Object[] leftSplit, Object[] rightSplit) { + private static int compareSplit( + Object[] leftSplit, Object[] rightSplit, ChunkKeyCompareMode compareMode) { // Ensure both splits have the same length if (leftSplit.length != rightSplit.length) { throw new IllegalArgumentException( @@ -233,7 +286,7 @@ private static int compareSplit(Object[] leftSplit, Object[] rightSplit) { int compareResult = 0; for (int i = 0; i < leftSplit.length; i++) { - compareResult = compareObjects(leftSplit[i], rightSplit[i]); + compareResult = compareObjects(leftSplit[i], rightSplit[i], compareMode); if (compareResult != 0) { break; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index c910cbfdc37..cf7a40e29a7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.table.types.logical.RowType; import io.debezium.jdbc.JdbcConnection; @@ -38,12 +39,24 @@ public class StatementUtils { private StatementUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public static Object[] queryMinMax( + JdbcConnection jdbc, + TableId tableId, + String columnName, + ChunkKeyCompareMode compareMode) throws SQLException { - final String minMaxQuery = - String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); + final String minMaxQuery; + if (compareMode == ChunkKeyCompareMode.BINARY) { + minMaxQuery = + String.format( + "SELECT CAST(MIN(BINARY %s) AS CHAR), CAST(MAX(BINARY %s) AS CHAR) FROM %s", + quote(columnName), quote(columnName), quote(tableId)); + } else { + minMaxQuery = + String.format( + "SELECT MIN(%s), MAX(%s) FROM %s", + quote(columnName), quote(columnName), quote(tableId)); + } return jdbc.queryAndMap( minMaxQuery, rs -> { @@ -97,12 +110,24 @@ public static void setSafeObject(PreparedStatement ps, int parameterIndex, Objec } public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, + TableId tableId, + String columnName, + Object excludedLowerBound, + ChunkKeyCompareMode compareMode) throws SQLException { - final String minQuery = - String.format( - "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); + final String minQuery; + if (compareMode == ChunkKeyCompareMode.BINARY) { + minQuery = + String.format( + "SELECT CAST(MIN(BINARY %s) AS CHAR) FROM %s WHERE BINARY %s > BINARY ?", + quote(columnName), quote(tableId), quote(columnName)); + } else { + minQuery = + String.format( + "SELECT MIN(%s) FROM %s WHERE %s > ?", + quote(columnName), quote(tableId), quote(columnName)); + } return jdbc.prepareQueryAndMap( minQuery, ps -> setSafeObject(ps, 1, excludedLowerBound), @@ -122,20 +147,36 @@ public static Object queryNextChunkMax( TableId tableId, String splitColumnName, int chunkSize, - Object includedLowerBound) + Object includedLowerBound, + ChunkKeyCompareMode compareMode) throws SQLException { String quotedColumn = quote(splitColumnName); - String query = - String.format( - "SELECT MAX(%s) FROM (" - + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" - + ") AS T", - quotedColumn, - quotedColumn, - quote(tableId), - quotedColumn, - quotedColumn, - chunkSize); + String query; + if (compareMode == ChunkKeyCompareMode.BINARY) { + query = + String.format( + "SELECT CAST(MAX(BINARY %s) AS CHAR) FROM (" + + "SELECT %s FROM %s WHERE BINARY %s >= BINARY ? ORDER BY BINARY %s ASC LIMIT %s" + + ") AS T", + quotedColumn, + quotedColumn, + quote(tableId), + quotedColumn, + quotedColumn, + chunkSize); + } else { + query = + String.format( + "SELECT MAX(%s) FROM (" + + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + ") AS T", + quotedColumn, + quotedColumn, + quote(tableId), + quotedColumn, + quotedColumn, + chunkSize); + } return jdbc.prepareQueryAndMap( query, ps -> setSafeObject(ps, 1, includedLowerBound), @@ -151,8 +192,13 @@ public static Object queryNextChunkMax( } public static String buildSplitScanQuery( - TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true); + TableId tableId, + RowType pkRowType, + boolean isFirstSplit, + boolean isLastSplit, + ChunkKeyCompareMode compareMode) { + return buildSplitQuery( + tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, compareMode); } private static String buildSplitQuery( @@ -161,34 +207,36 @@ private static String buildSplitQuery( boolean isFirstSplit, boolean isLastSplit, int limitSize, - boolean isScanningData) { + boolean isScanningData, + ChunkKeyCompareMode compareMode) { + final boolean isBinaryMode = compareMode == ChunkKeyCompareMode.BINARY; final String condition; if (isFirstSplit && isLastSplit) { condition = null; } else if (isFirstSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?", isBinaryMode); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?", isBinaryMode); sql.append(")"); } condition = sql.toString(); } else if (isLastSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?", isBinaryMode); condition = sql.toString(); } else { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?", isBinaryMode); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?", isBinaryMode); sql.append(")"); } sql.append(" AND "); - addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?", isBinaryMode); condition = sql.toString(); } @@ -261,10 +309,18 @@ private static PreparedStatement initStatement(JdbcConnection jdbc, String sql, } private static void addPrimaryKeyColumnsToCondition( - RowType pkRowType, StringBuilder sql, String predicate) { + RowType pkRowType, StringBuilder sql, String predicate, boolean isBinaryMode) { for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append(quote(fieldNamesIt.next())).append(predicate); + if (isBinaryMode) { + sql.append("BINARY "); + } + sql.append(quote(fieldNamesIt.next())); + if (isBinaryMode) { + sql.append(predicate.replace("?", "BINARY ?")); + } else { + sql.append(predicate); + } if (fieldNamesIt.hasNext()) { sql.append(" AND "); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 2a1f0519435..d09fedb2624 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.connectors.mysql.source.MySqlSource; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.DebeziumSourceFunction; import org.apache.flink.cdc.debezium.table.MetadataConverter; @@ -101,6 +102,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat final boolean parseOnlineSchemaChanges; private final boolean useLegacyJsonFormat; private final boolean assignUnboundedChunkFirst; + private final ChunkKeyCompareMode chunkKeyCompareMode; private final boolean appendOnly; @@ -144,6 +146,7 @@ public MySqlTableSource( boolean parseOnlineSchemaChanges, boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst, + ChunkKeyCompareMode chunkKeyCompareMode, boolean appendOnly) { this.physicalSchema = physicalSchema; this.port = port; @@ -177,6 +180,7 @@ public MySqlTableSource( this.skipSnapshotBackFill = skipSnapshotBackFill; this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; + this.chunkKeyCompareMode = chunkKeyCompareMode; this.appendOnly = appendOnly; } @@ -241,6 +245,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) + .chunkKeyCompareMode(chunkKeyCompareMode) .build(); return SourceProvider.of(parallelSource); } else { @@ -330,6 +335,7 @@ public DynamicTableSource copy() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, + chunkKeyCompareMode, appendOnly); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; @@ -376,6 +382,7 @@ public boolean equals(Object o) { && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges && useLegacyJsonFormat == that.useLegacyJsonFormat && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst + && Objects.equals(chunkKeyCompareMode, that.chunkKeyCompareMode) && Objects.equals(appendOnly, that.appendOnly); } @@ -413,6 +420,7 @@ public int hashCode() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, + chunkKeyCompareMode, appendOnly); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 5ea430d94e7..55779df90fc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.table; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -106,6 +107,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); boolean assignUnboundedChunkFirst = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); + ChunkKeyCompareMode chunkKeyCompareMode = + ChunkKeyCompareMode.fromValue( + config.get( + MySqlSourceOptions + .SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE)); boolean appendOnly = config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); @@ -156,6 +162,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { parseOnLineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, + chunkKeyCompareMode, appendOnly); } @@ -205,6 +212,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); + options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE); options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 01c2dff84da..22a4523f027 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.table; +import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode; import org.apache.flink.cdc.debezium.utils.ResolvedSchemaUtils; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -129,6 +130,7 @@ void testCommonProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -179,6 +181,7 @@ void testEnableParallelReadSource() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -225,6 +228,7 @@ void testEnableParallelReadSourceWithSingleServerId() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -269,6 +273,7 @@ void testEnableParallelReadSourceLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -330,6 +335,7 @@ void testOptionalProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource) .isEqualTo(expectedSource) @@ -389,6 +395,7 @@ void testStartupFromSpecificOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -431,6 +438,7 @@ void testStartupFromInitial() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -474,6 +482,7 @@ void testStartupFromEarliestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -518,6 +527,7 @@ void testStartupFromSpecificTimestamp() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -560,6 +570,7 @@ void testStartupFromLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -607,6 +618,7 @@ void testMetadataColumns() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + ChunkKeyCompareMode.DEFAULT, false); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -810,6 +822,7 @@ void testEnablingExperimentalOptions() { true, true, true, + ChunkKeyCompareMode.DEFAULT, false); Assertions.assertThat(actualSource).isEqualTo(expectedSource); }