Skip to content

[Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled #4246

Open
ThorneANN wants to merge 6 commits intoapache:masterfrom
ThorneANN:FLINK-38911-binlog-added-new-table
Open

[Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled #4246
ThorneANN wants to merge 6 commits intoapache:masterfrom
ThorneANN:FLINK-38911-binlog-added-new-table

Conversation

@ThorneANN
Copy link
Contributor

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:

Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
Add validation logic to ensure binlog-only mode works only with stream-only startup modes
Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder
The feature converts table patterns (e.g., "db.table_.") to Debezium regex style (e.g., "db.table_.")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Thorne and others added 3 commits January 29, 2026 20:15
…r DataStream API

This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase.

Key changes:
- Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions
- Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig
- Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory
- Add validation logic to ensure binlog-only mode works only with stream-only startup modes
- Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern
- Add logging in MySqlSnapshotSplitAssigner for binlog-only mode
- Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder

The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*")
and enables dynamic table discovery during binlog reading phase without triggering snapshots.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@github-actions github-actions bot added docs Improvements or additions to documentation mysql-cdc-connector labels Jan 30, 2026
@ThorneANN ThorneANN changed the title [Flink 38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled [Flink-38911][cdc connector mysql] mysql-cdc-connector datastream support scan.binlog.newly-added-table.enabled Feb 2, 2026
@ThorneANN
Copy link
Contributor Author

@copilot review

@ThorneANN
Copy link
Contributor Author

@Copilot review

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a new feature for the MySQL CDC DataStream connector to support capturing newly added tables in binlog-only mode without triggering a snapshot phase. The feature adds a new configuration option scan.binlog.newly-added-table.enabled that allows tables matching a pattern to be dynamically discovered and captured during the binlog reading phase.

Changes:

  • Added new configuration option scan.binlog.newly-added-table.enabled in MySqlSourceOptions with comprehensive documentation
  • Extended MySqlSource builder API and configuration classes to support the new feature
  • Added table pattern conversion logic from Flink CDC style to Debezium regex style
  • Added integration tests to validate binlog-only table capture functionality
  • Updated English and Chinese documentation with usage examples

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
MySqlSourceOptions.java Added new experimental config option for binlog-only newly added table capture
MySqlSourceConfig.java Added field and getter for scanBinlogNewlyAddedTableEnabled flag
MySqlSourceConfigFactory.java Added pattern conversion logic and validation for mutually exclusive modes
MySqlSourceBuilder.java Exposed scanBinlogNewlyAddedTableEnabled() API with detailed javadoc
MySqlTableSource.java Threaded scanBinlogNewlyAddedTableEnabled through table source implementation
MySqlTableSourceFactory.java Added config option to optional options and passed through to source
MySqlSnapshotSplitAssigner.java Added early return in captureNewlyAddedTables when binlog-only mode is enabled
BinlogOnlyNewlyAddedTableITCase.java Comprehensive integration tests for the new feature
binlog_test.sql Test database schema documentation with setup comments
MySqlTableSourceFactoryTest.java Updated test cases to include new config parameter
mysql-cdc.md (EN/ZH) Added documentation for the new configuration option with pattern examples

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

--
-- IMPORTANT: This SQL file defines the initial schema for reference and documentation.
-- The actual test creates tables dynamically during execution to validate binlog-only capture.
-- The initial_table is crea1ted in @BeforeEach to ensure binlog is active before CDC source starts.
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a typo in the comment. "crea1ted" should be "created".

Suggested change
-- The initial_table is crea1ted in @BeforeEach to ensure binlog is active before CDC source starts.
-- The initial_table is created in @BeforeEach to ensure binlog is active before CDC source starts.

Copilot uses AI. Check for mistakes.
throw new IllegalArgumentException(
"Cannot enable both 'scan.binlog.newly-added-table.enabled' and "
+ "'scan.newly-added-table.enabled' as they may cause duplicate data");
}
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions that validation logic should ensure binlog-only mode works only with stream-only startup modes (latest, earliest, specific offset, timestamp). However, this validation is missing. When scanBinlogNewlyAddedTableEnabled is true but the startup mode is not stream-only (e.g., initial or snapshot), the behavior may be incorrect. Add validation to check if startupOptions.isStreamOnly() when scanBinlogNewlyAddedTableEnabled is enabled.

Suggested change
}
}
// Validate: binlog-only newly-added-table scan requires a stream-only startup mode
if (scanBinlogNewlyAddedTableEnabled && !startupOptions.isStreamOnly()) {
throw new IllegalArgumentException(
"'scan.binlog.newly-added-table.enabled' can only be enabled when using a "
+ "stream-only startup mode (latest-offset, earliest-offset, specific-offset, or timestamp).");
}

Copilot uses AI. Check for mistakes.
Comment on lines +225 to 236
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}

// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description states "Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern", and the comment here claims "BinlogSplitReader will handle the auto-capture", but there are no changes to BinlogSplitReader.java in this PR. The existing logic in BinlogSplitReader.hasEnterPureBinlogPhase() (line 318-323) only checks isScanNewlyAddedTableEnabled(), not isScanBinlogNewlyAddedTableEnabled(). This means the feature will not actually work as described. BinlogSplitReader needs to be enhanced to check isScanBinlogNewlyAddedTableEnabled() and handle newly added tables accordingly.

Suggested change
// Binlog-only mode: no action needed in Assigner, BinlogSplitReader handles it
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured "
+ "in binlog phase without snapshot.");
// No action needed here, BinlogSplitReader will handle the auto-capture
return;
}
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
// Enable newly-added table discovery when either snapshot-based or binlog-only
// newly-added table capture is configured.
final boolean scanNewlyAddedTablesEnabled =
sourceConfig.isScanNewlyAddedTableEnabled()
|| sourceConfig.isScanBinlogNewlyAddedTableEnabled();
if (sourceConfig.isScanBinlogNewlyAddedTableEnabled()) {
LOG.info(
"Binlog-only newly added table capture is enabled. "
+ "New tables matching the pattern will be automatically captured.");
}
// Don't scan newly added table in snapshot-only startup mode.
if (scanNewlyAddedTablesEnabled

Copilot uses AI. Check for mistakes.
LOG.debug("After replacing comma with pipe separator: {}", tables);

// Step 2: Replace escaped dot \. with placeholder
// In Flink CDC, \. means any character in regex, in Debezium it should be .
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is incorrect. In standard regex notation (including Flink CDC), the backslash escapes the dot: \. means a literal dot character, while . (unescaped) means any single character. The comment should say "In Flink CDC, . means a literal dot in regex, in Debezium it should be . (any character)".

Suggested change
// In Flink CDC, \. means any character in regex, in Debezium it should be .
// In Flink CDC, \. means a literal dot in regex, in Debezium it should be . (any character).

Copilot uses AI. Check for mistakes.
Comment on lines 114 to 124
testDatabase.getDatabaseName() + ".user_.*",
"user_profiles",
"user_settings",
"user_logs");
}

@Test
void testBinlogOnlyCaptureWithDatabasePattern() throws Exception {
// Test with database.* pattern
testBinlogOnlyCaptureWithPattern(
testDatabase.getDatabaseName() + ".*", "product_inventory", "product_catalog");
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an inconsistency between the test patterns and the documented pattern format. The javadoc in MySqlSourceBuilder (lines 231-235) shows patterns like "db\." and "db\.user_\." (which become "db." and "db.user_." as string values), but these tests use unescaped patterns like ".user_." and ".". The pattern conversion function in MySqlSourceConfigFactory.convertToDebeziumStyle() expects escaped patterns (with backslash-dot), so these test patterns may not work correctly. Either the tests should use the documented format (e.g., "\.user_\.*"), or the conversion function needs to be updated to handle both escaped and unescaped patterns.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs Improvements or additions to documentation mysql-cdc-connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants