Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,19 @@ Flink SQL> SELECT * FROM orders;
如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.string-key.compare-mode</td>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use SHOW VARIABLES LIKE "collation_server" to query the character set in use from MySQL, avoiding the introduction of a new configuration parameter.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, but collation_server is just the server default — the actual table or column can override it. To auto-detect properly we'd have to query information_schema.COLUMNS per table at startup, and it gets messy with composite PKs where each column might have a different collation.
So I went with explicit config for now — default keeps backward compat and won't break anyone. We can add an auto mode later: detect each table's chunk-key collation during snapshot, store it per-table in the split state, and handle them individually

<td>optional</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>
增量快照阶段字符串类型 chunk key 的比较模式。可选值包括:
<li><code>default</code>:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则不一致。</li>
<li><code>case-insensitive</code>:使用大小写不敏感比较(Java 中的 <code>compareToIgnoreCase</code>),适用于 MySQL 使用 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则的场景。</li>
<li><code>binary</code>:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 <code>utf8mb4_bin</code> 或 <code>BINARY</code> 关键字等二进制比较的场景。</li>
</td>
</tr>
<tr>
<td>use.legacy.json.format</td>
Expand Down
13 changes: 13 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,19 @@ pipeline:
如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.string-key.compare-mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>
增量快照阶段字符串类型 chunk key 的比较模式。可选值包括:
<li><code>default</code>:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则不一致。</li>
<li><code>case-insensitive</code>:使用大小写不敏感比较(Java 中的 <code>compareToIgnoreCase</code>),适用于 MySQL 使用 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则的场景。</li>
<li><code>binary</code>:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 <code>utf8mb4_bin</code> 或 <code>BINARY</code> 关键字等二进制比较的场景。</li>
</td>
</tr>
<tr>
<td>metadata.list</td>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,18 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.string-key.compare-mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>
The compare mode for string type chunk key during incremental snapshot phase. Available values are:
<li><code>default</code>: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like <code>utf8mb4_general_ci</code>.</li>
<li><code>case-insensitive</code>: Use case-insensitive comparison (<code>compareToIgnoreCase</code> in Java), suitable when MySQL uses case-insensitive collation such as <code>utf8mb4_general_ci</code>.</li>
<li><code>binary</code>: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like <code>utf8mb4_bin</code> or <code>BINARY</code> keyword.</li>
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,18 @@ pipeline:
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.string-key.compare-mode</td>
<td>optional</td>
<td style="word-wrap: break-word;">default</td>
<td>String</td>
<td>
The compare mode for string type chunk key during incremental snapshot phase. Available values are:
<li><code>default</code>: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like <code>utf8mb4_general_ci</code>.</li>
<li><code>case-insensitive</code>: Use case-insensitive comparison (<code>compareToIgnoreCase</code> in Java), suitable when MySQL uses case-insensitive collation such as <code>utf8mb4_general_ci</code>.</li>
<li><code>binary</code>: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like <code>utf8mb4_bin</code> or <code>BINARY</code> keyword.</li>
</td>
</tr>
<tr>
<td>metadata.list</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand Down Expand Up @@ -80,6 +81,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
Expand Down Expand Up @@ -167,6 +169,9 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
ChunkKeyCompareMode chunkKeyCompareMode =
ChunkKeyCompareMode.fromValue(
config.get(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE));

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -220,6 +225,7 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.chunkKeyCompareMode(chunkKeyCompareMode)
.skipSnapshotBackfill(skipSnapshotBackfill);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
Expand Down Expand Up @@ -358,6 +364,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,19 @@ public class MySqlDataSourceOptions {
+ "By default, the chunk key is the first column of the primary key."
+ "eg. db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE =
ConfigOptions.key("scan.incremental.snapshot.string-key.compare-mode")
.stringType()
.defaultValue("default")
.withDescription(
"The compare mode for string chunk key during incremental snapshot. Supported values are: "
+ "'default' (uses Java String.compareTo), "
+ "'case-insensitive' (uses Java String.compareToIgnoreCase to align with MySQL case-insensitive collations), "
+ "'binary' (forces binary comparison in both MySQL SQL and Java). "
+ "The 'case-insensitive' mode is recommended for tables using utf8mb4_general_ci or similar case-insensitive collations. "
+ "The 'binary' mode ensures consistent byte-level ordering but may impact query performance during chunk splitting.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
new StoppableChangeEventSourceContext();
private final boolean isParsingOnLineSchemaChanges;
private final boolean isBackfillSkipped;
private final ChunkKeyCompareMode chunkKeyCompareMode;
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;

private static final long READER_CLOSE_TIMEOUT = 30L;
Expand All @@ -116,6 +118,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
this.isParsingOnLineSchemaChanges =
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
this.chunkKeyCompareMode = statefulTaskContext.getSourceConfig().getChunkKeyCompareMode();
this.pendingSchemaChangeEvents = new HashMap<>();
}

Expand Down Expand Up @@ -316,7 +319,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {

FinishedSnapshotSplitInfo matchedSplit =
SplitKeyUtils.findSplitByKeyBinary(
finishedSplitsInfo.get(tableId), chunkKey);
finishedSplitsInfo.get(tableId), chunkKey, chunkKeyCompareMode);

return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark());
}
Expand Down Expand Up @@ -381,7 +384,12 @@ private void configureFilter() {
}
// Sort splits by splitStart for binary search optimization
// Binary search requires sorted data to work correctly
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
splitsInfoMap
.values()
.forEach(
splits ->
SplitKeyUtils.sortFinishedSplitInfos(
splits, chunkKeyCompareMode));
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
currentSnapshotSplit.getSplitKeyType(),
nameAdjuster,
currentSnapshotSplit.getSplitStart(),
currentSnapshotSplit.getSplitEnd());
currentSnapshotSplit.getSplitEnd(),
statefulTaskContext.getSourceConfig().getChunkKeyCompareMode());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ private void createDataEventsForTable(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
snapshotSplit.getSplitEnd() == null,
sourceConfig.getChunkKeyCompareMode());
LOG.info(
"For split '{}' of table {} using select statement: '{}'",
snapshotSplit.splitId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mysql.source;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
Expand Down Expand Up @@ -312,6 +313,12 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
return this;
}

/** The compare mode for string chunk key during incremental snapshot. Defaults to 'default'. */
public MySqlSourceBuilder<T> chunkKeyCompareMode(ChunkKeyCompareMode chunkKeyCompareMode) {
this.configFactory.chunkKeyCompareMode(chunkKeyCompareMode);
return this;
}

/**
* Build the {@link MySqlSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
ChunkUtils.getChunkKeyColumnType(
splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
StatementUtils.queryMinMax(
jdbcConnection,
tableId,
splitColumn.name(),
sourceConfig.getChunkKeyCompareMode());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
} catch (Exception e) {
throw new RuntimeException("Fail to analyze table in chunk splitter.", e);
Expand Down Expand Up @@ -184,7 +188,12 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition,
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
if (chunkEnd != null
&& ObjectUtils.compare(
chunkEnd,
minMaxOfSplitColumn[1],
sourceConfig.getChunkKeyCompareMode())
<= 0) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(
jdbcConnection,
Expand Down Expand Up @@ -306,7 +315,7 @@ public List<ChunkRange> splitEvenlySizedChunks(
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = ObjectUtils.plus(min, dynamicChunkSize);
while (ObjectUtils.compare(chunkEnd, max) <= 0) {
while (ObjectUtils.compare(chunkEnd, max, sourceConfig.getChunkKeyCompareMode()) <= 0) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
chunkStart = chunkEnd;
try {
Expand All @@ -322,8 +331,7 @@ public List<ChunkRange> splitEvenlySizedChunks(
return splits;
}

@VisibleForTesting
Object nextChunkEnd(
private Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
Expand All @@ -334,14 +342,22 @@ Object nextChunkEnd(
// chunk end might be null when max values are removed
Object chunkEnd =
StatementUtils.queryNextChunkMax(
jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
if (chunkEnd == null) {
return null;
}
jdbc,
tableId,
splitColumnName,
chunkSize,
previousChunkEnd,
sourceConfig.getChunkKeyCompareMode());
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd =
StatementUtils.queryMin(
jdbc,
tableId,
splitColumnName,
chunkEnd,
sourceConfig.getChunkKeyCompareMode());

// queryMin will return null when the chunkEnd is the max value,
// this will happen when the mysql table ignores the capitalization.
Expand All @@ -355,7 +371,7 @@ Object nextChunkEnd(
return null;
}
}
if (ObjectUtils.compare(chunkEnd, max) >= 0) {
if (ObjectUtils.compare(chunkEnd, max, sourceConfig.getChunkKeyCompareMode()) >= 0) {
return null;
} else {
return chunkEnd;
Expand Down
Loading