splits) {
+ sortFinishedSplitInfos(splits, ChunkKeyCompareMode.DEFAULT);
+ }
+
+ /**
+ * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending order. This is
+ * required for binary search to work correctly.
+ *
+ * Handles special cases: - Splits with null splitStart are considered as MIN value (sorted
+ * to front) - Splits with null splitEnd are considered as MAX value (sorted to back)
+ *
+ *
NOTE: Current implementation assumes single-field split keys (as indicated by
+ * getSplitKey()). If multi-field split keys are supported in the future, the comparison logic
+ * should be reviewed to ensure consistency with {@link
+ * #splitKeyRangeContains(Object[],Object[],Object[],ChunkKeyCompareMode)}.
+ *
+ * @param splits List of splits to be sorted (sorted in-place)
+ * @param compareMode The compare mode for string chunk key comparison
+ */
+ public static void sortFinishedSplitInfos(
+ List splits, ChunkKeyCompareMode compareMode) {
if (splits == null || splits.size() <= 1) {
return;
}
@@ -105,7 +132,7 @@ public static void sortFinishedSplitInfos(List splits
}
// Compare split starts
- return compareSplit(leftSplitStart, rightSplitStart);
+ return compareSplit(leftSplitStart, rightSplitStart, compareMode);
});
}
@@ -125,6 +152,28 @@ public static void sortFinishedSplitInfos(List splits
*/
public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
List sortedSplits, Object[] key) {
+ return findSplitByKeyBinary(sortedSplits, key, ChunkKeyCompareMode.DEFAULT);
+ }
+
+ /**
+ * Uses binary search to find the split containing the specified key in a sorted split list.
+ *
+ * IMPORTANT: The splits list MUST be sorted by splitStart before calling this method. Use
+ * sortFinishedSplitInfos() to sort the list if needed.
+ *
+ *
To leverage data locality for append-heavy workloads (e.g. auto-increment PKs), this
+ * method checks the first and last splits before applying binary search to the remaining
+ * subset.
+ *
+ * @param sortedSplits List of splits sorted by splitStart (MUST be sorted!)
+ * @param key The chunk key to search for
+ * @param compareMode The compare mode for string chunk key comparison
+ * @return The split containing the key, or null if not found
+ */
+ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+ List sortedSplits,
+ Object[] key,
+ ChunkKeyCompareMode compareMode) {
if (sortedSplits == null || sortedSplits.isEmpty()) {
return null;
@@ -134,7 +183,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
FinishedSnapshotSplitInfo firstSplit = sortedSplits.get(0);
RangePosition firstPosition =
- compareKeyWithRange(key, firstSplit.getSplitStart(), firstSplit.getSplitEnd());
+ compareKeyWithRange(
+ key, firstSplit.getSplitStart(), firstSplit.getSplitEnd(), compareMode);
if (firstPosition == RangePosition.WITHIN) {
return firstSplit;
}
@@ -147,7 +197,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
FinishedSnapshotSplitInfo lastSplit = sortedSplits.get(size - 1);
RangePosition lastPosition =
- compareKeyWithRange(key, lastSplit.getSplitStart(), lastSplit.getSplitEnd());
+ compareKeyWithRange(
+ key, lastSplit.getSplitStart(), lastSplit.getSplitEnd(), compareMode);
if (lastPosition == RangePosition.WITHIN) {
return lastSplit;
}
@@ -166,7 +217,8 @@ public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
FinishedSnapshotSplitInfo split = sortedSplits.get(mid);
RangePosition position =
- compareKeyWithRange(key, split.getSplitStart(), split.getSplitEnd());
+ compareKeyWithRange(
+ key, split.getSplitStart(), split.getSplitEnd(), compareMode);
if (position == RangePosition.WITHIN) {
return split;
@@ -192,29 +244,29 @@ private enum RangePosition {
* returns where the key lies relative to that interval.
*/
private static RangePosition compareKeyWithRange(
- Object[] key, Object[] splitStart, Object[] splitEnd) {
+ Object[] key, Object[] splitStart, Object[] splitEnd, ChunkKeyCompareMode compareMode) {
if (splitStart == null) {
if (splitEnd == null) {
return RangePosition.WITHIN; // Full range split
}
// key < splitEnd ?
- int cmp = compareSplit(key, splitEnd);
+ int cmp = compareSplit(key, splitEnd, compareMode);
return cmp < 0 ? RangePosition.WITHIN : RangePosition.AFTER;
}
if (splitEnd == null) {
// key >= splitStart ?
- int cmp = compareSplit(key, splitStart);
+ int cmp = compareSplit(key, splitStart, compareMode);
return cmp >= 0 ? RangePosition.WITHIN : RangePosition.BEFORE;
}
// Normal case: [splitStart, splitEnd)
- int cmpStart = compareSplit(key, splitStart);
+ int cmpStart = compareSplit(key, splitStart, compareMode);
if (cmpStart < 0) {
return RangePosition.BEFORE; // key < splitStart
}
- int cmpEnd = compareSplit(key, splitEnd);
+ int cmpEnd = compareSplit(key, splitEnd, compareMode);
if (cmpEnd >= 0) {
return RangePosition.AFTER; // key >= splitEnd
}
@@ -222,7 +274,8 @@ private static RangePosition compareKeyWithRange(
return RangePosition.WITHIN; // splitStart <= key < splitEnd
}
- private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
+ private static int compareSplit(
+ Object[] leftSplit, Object[] rightSplit, ChunkKeyCompareMode compareMode) {
// Ensure both splits have the same length
if (leftSplit.length != rightSplit.length) {
throw new IllegalArgumentException(
@@ -233,7 +286,7 @@ private static int compareSplit(Object[] leftSplit, Object[] rightSplit) {
int compareResult = 0;
for (int i = 0; i < leftSplit.length; i++) {
- compareResult = compareObjects(leftSplit[i], rightSplit[i]);
+ compareResult = compareObjects(leftSplit[i], rightSplit[i], compareMode);
if (compareResult != 0) {
break;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
index c910cbfdc37..cf7a40e29a7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.source.utils;
+import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.table.types.logical.RowType;
import io.debezium.jdbc.JdbcConnection;
@@ -38,12 +39,24 @@ public class StatementUtils {
private StatementUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
+ public static Object[] queryMinMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ ChunkKeyCompareMode compareMode)
throws SQLException {
- final String minMaxQuery =
- String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName), quote(tableId));
+ final String minMaxQuery;
+ if (compareMode == ChunkKeyCompareMode.BINARY) {
+ minMaxQuery =
+ String.format(
+ "SELECT CAST(MIN(BINARY %s) AS CHAR), CAST(MAX(BINARY %s) AS CHAR) FROM %s",
+ quote(columnName), quote(columnName), quote(tableId));
+ } else {
+ minMaxQuery =
+ String.format(
+ "SELECT MIN(%s), MAX(%s) FROM %s",
+ quote(columnName), quote(columnName), quote(tableId));
+ }
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
@@ -97,12 +110,24 @@ public static void setSafeObject(PreparedStatement ps, int parameterIndex, Objec
}
public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ Object excludedLowerBound,
+ ChunkKeyCompareMode compareMode)
throws SQLException {
- final String minQuery =
- String.format(
- "SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quote(tableId), quote(columnName));
+ final String minQuery;
+ if (compareMode == ChunkKeyCompareMode.BINARY) {
+ minQuery =
+ String.format(
+ "SELECT CAST(MIN(BINARY %s) AS CHAR) FROM %s WHERE BINARY %s > BINARY ?",
+ quote(columnName), quote(tableId), quote(columnName));
+ } else {
+ minQuery =
+ String.format(
+ "SELECT MIN(%s) FROM %s WHERE %s > ?",
+ quote(columnName), quote(tableId), quote(columnName));
+ }
return jdbc.prepareQueryAndMap(
minQuery,
ps -> setSafeObject(ps, 1, excludedLowerBound),
@@ -122,20 +147,36 @@ public static Object queryNextChunkMax(
TableId tableId,
String splitColumnName,
int chunkSize,
- Object includedLowerBound)
+ Object includedLowerBound,
+ ChunkKeyCompareMode compareMode)
throws SQLException {
String quotedColumn = quote(splitColumnName);
- String query =
- String.format(
- "SELECT MAX(%s) FROM ("
- + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
- + ") AS T",
- quotedColumn,
- quotedColumn,
- quote(tableId),
- quotedColumn,
- quotedColumn,
- chunkSize);
+ String query;
+ if (compareMode == ChunkKeyCompareMode.BINARY) {
+ query =
+ String.format(
+ "SELECT CAST(MAX(BINARY %s) AS CHAR) FROM ("
+ + "SELECT %s FROM %s WHERE BINARY %s >= BINARY ? ORDER BY BINARY %s ASC LIMIT %s"
+ + ") AS T",
+ quotedColumn,
+ quotedColumn,
+ quote(tableId),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ } else {
+ query =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s"
+ + ") AS T",
+ quotedColumn,
+ quotedColumn,
+ quote(tableId),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ }
return jdbc.prepareQueryAndMap(
query,
ps -> setSafeObject(ps, 1, includedLowerBound),
@@ -151,8 +192,13 @@ public static Object queryNextChunkMax(
}
public static String buildSplitScanQuery(
- TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) {
- return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true);
+ TableId tableId,
+ RowType pkRowType,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ ChunkKeyCompareMode compareMode) {
+ return buildSplitQuery(
+ tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, compareMode);
}
private static String buildSplitQuery(
@@ -161,34 +207,36 @@ private static String buildSplitQuery(
boolean isFirstSplit,
boolean isLastSplit,
int limitSize,
- boolean isScanningData) {
+ boolean isScanningData,
+ ChunkKeyCompareMode compareMode) {
+ final boolean isBinaryMode = compareMode == ChunkKeyCompareMode.BINARY;
final String condition;
if (isFirstSplit && isLastSplit) {
condition = null;
} else if (isFirstSplit) {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?", isBinaryMode);
if (isScanningData) {
sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?", isBinaryMode);
sql.append(")");
}
condition = sql.toString();
} else if (isLastSplit) {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?", isBinaryMode);
condition = sql.toString();
} else {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " >= ?", isBinaryMode);
if (isScanningData) {
sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " = ?", isBinaryMode);
sql.append(")");
}
sql.append(" AND ");
- addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?");
+ addPrimaryKeyColumnsToCondition(pkRowType, sql, " <= ?", isBinaryMode);
condition = sql.toString();
}
@@ -261,10 +309,18 @@ private static PreparedStatement initStatement(JdbcConnection jdbc, String sql,
}
private static void addPrimaryKeyColumnsToCondition(
- RowType pkRowType, StringBuilder sql, String predicate) {
+ RowType pkRowType, StringBuilder sql, String predicate, boolean isBinaryMode) {
for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(quote(fieldNamesIt.next())).append(predicate);
+ if (isBinaryMode) {
+ sql.append("BINARY ");
+ }
+ sql.append(quote(fieldNamesIt.next()));
+ if (isBinaryMode) {
+ sql.append(predicate.replace("?", "BINARY ?"));
+ } else {
+ sql.append(predicate);
+ }
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 2a1f0519435..d09fedb2624 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
+import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
@@ -101,6 +102,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
final boolean parseOnlineSchemaChanges;
private final boolean useLegacyJsonFormat;
private final boolean assignUnboundedChunkFirst;
+ private final ChunkKeyCompareMode chunkKeyCompareMode;
private final boolean appendOnly;
@@ -144,6 +146,7 @@ public MySqlTableSource(
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst,
+ ChunkKeyCompareMode chunkKeyCompareMode,
boolean appendOnly) {
this.physicalSchema = physicalSchema;
this.port = port;
@@ -177,6 +180,7 @@ public MySqlTableSource(
this.skipSnapshotBackFill = skipSnapshotBackFill;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+ this.chunkKeyCompareMode = chunkKeyCompareMode;
this.appendOnly = appendOnly;
}
@@ -241,6 +245,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
+ .chunkKeyCompareMode(chunkKeyCompareMode)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -330,6 +335,7 @@ public DynamicTableSource copy() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
+ chunkKeyCompareMode,
appendOnly);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
@@ -376,6 +382,7 @@ public boolean equals(Object o) {
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
&& useLegacyJsonFormat == that.useLegacyJsonFormat
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
+ && Objects.equals(chunkKeyCompareMode, that.chunkKeyCompareMode)
&& Objects.equals(appendOnly, that.appendOnly);
}
@@ -413,6 +420,7 @@ public int hashCode() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
+ chunkKeyCompareMode,
appendOnly);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 5ea430d94e7..55779df90fc 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.table;
+import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -106,6 +107,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
boolean assignUnboundedChunkFirst =
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
+ ChunkKeyCompareMode chunkKeyCompareMode =
+ ChunkKeyCompareMode.fromValue(
+ config.get(
+ MySqlSourceOptions
+ .SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE));
boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
@@ -156,6 +162,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
parseOnLineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
+ chunkKeyCompareMode,
appendOnly);
}
@@ -205,6 +212,7 @@ public Set> optionalOptions() {
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
+ options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE);
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 01c2dff84da..22a4523f027 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.mysql.table;
+import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
import org.apache.flink.cdc.debezium.utils.ResolvedSchemaUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -129,6 +130,7 @@ void testCommonProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -179,6 +181,7 @@ void testEnableParallelReadSource() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -225,6 +228,7 @@ void testEnableParallelReadSourceWithSingleServerId() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -269,6 +273,7 @@ void testEnableParallelReadSourceLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -330,6 +335,7 @@ void testOptionalProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true,
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource)
.isEqualTo(expectedSource)
@@ -389,6 +395,7 @@ void testStartupFromSpecificOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -431,6 +438,7 @@ void testStartupFromInitial() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -474,6 +482,7 @@ void testStartupFromEarliestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -518,6 +527,7 @@ void testStartupFromSpecificTimestamp() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -560,6 +570,7 @@ void testStartupFromLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -607,6 +618,7 @@ void testMetadataColumns() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
+ ChunkKeyCompareMode.DEFAULT,
false);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@@ -810,6 +822,7 @@ void testEnablingExperimentalOptions() {
true,
true,
true,
+ ChunkKeyCompareMode.DEFAULT,
false);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}