From 72637e221a210940de11b044468458e9674c47ea Mon Sep 17 00:00:00 2001 From: tianfengyu Date: Thu, 2 Apr 2026 17:08:50 +0800 Subject: [PATCH 1/2] [FLINK-39391][cdc-connector] Propagate scan.snapshot.fetch.size to Debezium properties in Oracle, SqlServer, DB2, and Postgres connectors The user-configured scan.snapshot.fetch.size is stored in JdbcSourceConfig.fetchSize but was never written into the Debezium Properties object. The snapshot execution path reads fetchSize from Debezium ConnectorConfig (query.fetch.size for Oracle/SqlServer/DB2, snapshot.fetch.size for Postgres) instead of JdbcSourceConfig, so the user value was silently ignored. For Oracle/SqlServer/DB2 this defaults to 0, which causes the JDBC driver to use its own default (e.g. Oracle defaults to fetchSize=10). On high-latency networks this leads to severe performance degradation (observed ~9x slowdown at 31ms RTT vs 3ms RTT). This commit propagates fetchSize into the corresponding Debezium property before the user-defined debezium properties override, so explicit debezium.query.fetch.size / debezium.snapshot.fetch.size overrides still take precedence. MySQL connector is not affected as it already sets database.fetchSize. Made-with: Cursor --- .../connectors/db2/source/config/Db2SourceConfigFactory.java | 2 ++ .../oracle/source/config/OracleSourceConfigFactory.java | 2 ++ .../postgres/source/config/PostgresSourceConfigFactory.java | 2 ++ .../sqlserver/source/config/SqlServerSourceConfigFactory.java | 2 ++ 4 files changed, 8 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java index 365e7871099..af4a45a9c3c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactory.java @@ -75,6 +75,8 @@ public Db2SourceConfig create(int subtask) { throw new UnsupportedOperationException(); } + props.setProperty("query.fetch.size", String.valueOf(fetchSize)); + if (dbzProperties != null) { props.putAll(dbzProperties); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index a255b380626..38186343777 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -103,6 +103,8 @@ public OracleSourceConfig create(int subtaskId) { props.setProperty("table.include.list", String.join(",", tableList)); } + props.setProperty("query.fetch.size", String.valueOf(fetchSize)); + // override the user-defined debezium properties if (dbzProperties != null) { props.putAll(dbzProperties); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 847b1547461..d8ec3f02670 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -100,6 +100,8 @@ public PostgresSourceConfig create(int subtaskId) { props.setProperty("table.include.list", String.join(",", tableList)); } + props.setProperty("snapshot.fetch.size", String.valueOf(fetchSize)); + // override the user-defined debezium properties if (dbzProperties != null) { props.putAll(dbzProperties); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java index 661094a8942..4e956d9b38d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactory.java @@ -75,6 +75,8 @@ public SqlServerSourceConfig create(int subtask) { throw new UnsupportedOperationException(); } + props.setProperty("query.fetch.size", String.valueOf(fetchSize)); + if (dbzProperties != null) { props.putAll(dbzProperties); } From a067b6e3c8a004c7ddc1723b47ab1e24c721a180 Mon Sep 17 00:00:00 2001 From: tianfengyu Date: Thu, 2 Apr 2026 17:34:08 +0800 Subject: [PATCH 2/2] [FLINK-39391][cdc-connector] Add unit tests for fetchSize propagation to Debezium properties Verify that scan.snapshot.fetch.size is correctly propagated to the underlying Debezium query.fetch.size (Oracle/SqlServer/DB2) and snapshot.fetch.size (Postgres) properties, including default value propagation and user-provided debezium property override behavior. Made-with: Cursor --- .../config/Db2SourceConfigFactoryTest.java | 76 +++++++++++++++++++ .../config/OracleSourceConfigFactoryTest.java | 76 +++++++++++++++++++ .../PostgresSourceConfigFactoryTest.java | 76 +++++++++++++++++++ .../SqlServerSourceConfigFactoryTest.java | 76 +++++++++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactoryTest.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactoryTest.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactoryTest.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactoryTest.java new file mode 100644 index 00000000000..caa5f21d265 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/source/config/Db2SourceConfigFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.db2.source.config; + +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link Db2SourceConfigFactory}. */ +class Db2SourceConfigFactoryTest { + + @Test + void testFetchSizePropagatedToDebeziumProperties() { + Db2SourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + + Db2SourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("5000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(5000); + } + + @Test + void testDefaultFetchSizePropagatedToDebeziumProperties() { + Db2SourceConfigFactory factory = createFactory(); + + Db2SourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("1024"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(1024); + } + + @Test + void testDebeziumPropertiesCanOverrideFetchSize() { + Db2SourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + Properties dbzProps = new Properties(); + dbzProps.setProperty("query.fetch.size", "8000"); + factory.debeziumProperties(dbzProps); + + Db2SourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("8000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(8000); + } + + private static Db2SourceConfigFactory createFactory() { + Db2SourceConfigFactory factory = new Db2SourceConfigFactory(); + factory.hostname("localhost"); + factory.port(50000); + factory.databaseList("myDB"); + factory.username("user"); + factory.password("password"); + factory.startupOptions(StartupOptions.initial()); + return factory; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactoryTest.java new file mode 100644 index 00000000000..591492cd7b6 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/config/OracleSourceConfigFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source.config; + +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OracleSourceConfigFactory}. */ +class OracleSourceConfigFactoryTest { + + @Test + void testFetchSizePropagatedToDebeziumProperties() { + OracleSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + + OracleSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("5000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(5000); + } + + @Test + void testDefaultFetchSizePropagatedToDebeziumProperties() { + OracleSourceConfigFactory factory = createFactory(); + + OracleSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("1024"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(1024); + } + + @Test + void testDebeziumPropertiesCanOverrideFetchSize() { + OracleSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + Properties dbzProps = new Properties(); + dbzProps.setProperty("query.fetch.size", "8000"); + factory.debeziumProperties(dbzProps); + + OracleSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("8000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(8000); + } + + private static OracleSourceConfigFactory createFactory() { + OracleSourceConfigFactory factory = new OracleSourceConfigFactory(); + factory.hostname("localhost"); + factory.port(1521); + factory.databaseList("MYDB"); + factory.username("user"); + factory.password("password"); + factory.startupOptions(StartupOptions.initial()); + return factory; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactoryTest.java new file mode 100644 index 00000000000..07c9aad3dd0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.config; + +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PostgresSourceConfigFactory}. */ +class PostgresSourceConfigFactoryTest { + + @Test + void testFetchSizePropagatedToDebeziumProperties() { + PostgresSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + + PostgresSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("snapshot.fetch.size")).isEqualTo("5000"); + assertThat(config.getDbzConnectorConfig().getSnapshotFetchSize()).isEqualTo(5000); + } + + @Test + void testDefaultFetchSizePropagatedToDebeziumProperties() { + PostgresSourceConfigFactory factory = createFactory(); + + PostgresSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("snapshot.fetch.size")).isEqualTo("1024"); + assertThat(config.getDbzConnectorConfig().getSnapshotFetchSize()).isEqualTo(1024); + } + + @Test + void testDebeziumPropertiesCanOverrideFetchSize() { + PostgresSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + Properties dbzProps = new Properties(); + dbzProps.setProperty("snapshot.fetch.size", "8000"); + factory.debeziumProperties(dbzProps); + + PostgresSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("snapshot.fetch.size")).isEqualTo("8000"); + assertThat(config.getDbzConnectorConfig().getSnapshotFetchSize()).isEqualTo(8000); + } + + private static PostgresSourceConfigFactory createFactory() { + PostgresSourceConfigFactory factory = new PostgresSourceConfigFactory(); + factory.hostname("localhost"); + factory.port(5432); + factory.database("myDB"); + factory.username("user"); + factory.password("password"); + factory.startupOptions(StartupOptions.initial()); + return factory; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java new file mode 100644 index 00000000000..f647b1205a5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/config/SqlServerSourceConfigFactoryTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.source.config; + +import org.apache.flink.cdc.connectors.base.options.StartupOptions; + +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SqlServerSourceConfigFactory}. */ +class SqlServerSourceConfigFactoryTest { + + @Test + void testFetchSizePropagatedToDebeziumProperties() { + SqlServerSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + + SqlServerSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("5000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(5000); + } + + @Test + void testDefaultFetchSizePropagatedToDebeziumProperties() { + SqlServerSourceConfigFactory factory = createFactory(); + + SqlServerSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("1024"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(1024); + } + + @Test + void testDebeziumPropertiesCanOverrideFetchSize() { + SqlServerSourceConfigFactory factory = createFactory(); + factory.fetchSize(5000); + Properties dbzProps = new Properties(); + dbzProps.setProperty("query.fetch.size", "8000"); + factory.debeziumProperties(dbzProps); + + SqlServerSourceConfig config = factory.create(0); + + assertThat(config.getDbzProperties().getProperty("query.fetch.size")).isEqualTo("8000"); + assertThat(config.getDbzConnectorConfig().getQueryFetchSize()).isEqualTo(8000); + } + + private static SqlServerSourceConfigFactory createFactory() { + SqlServerSourceConfigFactory factory = new SqlServerSourceConfigFactory(); + factory.hostname("localhost"); + factory.port(1433); + factory.databaseList("myDB"); + factory.username("user"); + factory.password("password"); + factory.startupOptions(StartupOptions.initial()); + return factory; + } +}