diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index dc487c1033c5..97f057ddd251 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED; @@ -302,19 +303,19 @@ public static String combinedModeTableList( // be excluded by excluding pattern at the same time String includingPattern = String.format("(%s)\\.(%s)", databasePattern, includingTablePattern); - + LOG.info("Combined mode including pattern is {}", includingPattern); if (excludedTables.isEmpty()) { return includingPattern; } + // Use Pattern.quote to escape special regex characters (e.g., '$' in mysql table name) + // in database and table names, ensuring exact literal matching in the regex. String excludingPattern = excludedTables.stream() - .map( - t -> - String.format( - "(^%s$)", - t.getDatabaseName() + "\\." + t.getObjectName())) + .map(Identifier::getFullName) + .map(n -> String.format("(^%s$)", Pattern.quote(n))) .collect(Collectors.joining("|")); + LOG.info("Combined mode excluding pattern is {}", excludingPattern); excludingPattern = "?!" + excludingPattern; return String.format("(%s)(%s)", excludingPattern, includingPattern); }