From 52e719fe80e2220b03b0b8b84be0777cc7d79ce4 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 8 Apr 2026 11:21:31 +0800 Subject: [PATCH 1/7] support no pk table for pg --- .../IncrementalSourceScanFetcher.java | 10 +- .../external/JdbcSourceFetchTaskContext.java | 70 ++++++--- .../connectors/base/utils/SplitKeyUtils.java | 27 +++- .../postgres/source/PostgresDialect.java | 55 +++++++ .../table/PostgreSQLConnectorITCase.java | 142 ++++++++++++++++++ .../src/test/resources/ddl/inventory.sql | 22 ++- 6 files changed, 304 insertions(+), 22 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index 3136bb97d25..19ebbaf900b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.data.Envelope; import io.debezium.pipeline.DataChangeEvent; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -177,7 +178,14 @@ public Iterator pollWithBuffer() throws InterruptedException { } if (!reachChangeLogStart) { - outputBuffer.put((Struct) record.key(), record); + if (record.key() != null) { + outputBuffer.put((Struct) record.key(), record); + } else { + // For tables without primary key, use after struct as buffer key + Struct value = (Struct) record.value(); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + outputBuffer.put(after, record); + } } else { if (isChangeRecordInChunkRange(record)) { // rewrite overlapping snapshot records through the record key diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index 9975331e469..e89d7357395 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -38,6 +38,8 @@ import io.debezium.util.SchemaNameAdjuster; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.Collection; @@ -49,6 +51,8 @@ @Internal public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class); + protected final JdbcSourceConfig sourceConfig; protected final JdbcDataSourceDialect dataSourceDialect; protected CommonConnectorConfig dbzConnectorConfig; @@ -92,33 +96,46 @@ public Object[] getSplitKey(SourceRecord record) { @Override public void rewriteOutputBuffer( Map outputBuffer, SourceRecord changeRecord) { - Struct key = (Struct) changeRecord.key(); + boolean hasPrimaryKey = changeRecord.key() != null; Struct value = (Struct) changeRecord.value(); if (value != null) { Envelope.Operation operation = Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); switch (operation) { case CREATE: + if (hasPrimaryKey) { + outputBuffer.put( + (Struct) changeRecord.key(), + buildReadRecord(changeRecord)); + } else { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + outputBuffer.put(after, buildReadRecord(changeRecord)); + } + break; case UPDATE: - Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema()); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - Struct after = value.getStruct(Envelope.FieldName.AFTER); - Instant fetchTs = - Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP)); - SourceRecord record = - new SourceRecord( - changeRecord.sourcePartition(), - changeRecord.sourceOffset(), - changeRecord.topic(), - changeRecord.kafkaPartition(), - changeRecord.keySchema(), - changeRecord.key(), - changeRecord.valueSchema(), - envelope.read(after, source, fetchTs)); - outputBuffer.put(key, record); + if (hasPrimaryKey) { + outputBuffer.put( + (Struct) changeRecord.key(), + buildReadRecord(changeRecord)); + } else { + // For no-PK table: remove the before image, insert the after image + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + if (before != null) { + outputBuffer.remove(before); + } + outputBuffer.put(after, buildReadRecord(changeRecord)); + } break; case DELETE: - outputBuffer.remove(key); + if (hasPrimaryKey) { + outputBuffer.remove((Struct) changeRecord.key()); + } else { + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + if (before != null) { + outputBuffer.remove(before); + } + } break; case READ: throw new IllegalStateException( @@ -129,6 +146,23 @@ public void rewriteOutputBuffer( } } + private SourceRecord buildReadRecord(SourceRecord changeRecord) { + Struct value = (Struct) changeRecord.value(); + Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema()); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + Instant fetchTs = Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP)); + return new SourceRecord( + changeRecord.sourcePartition(), + changeRecord.sourceOffset(), + changeRecord.topic(), + changeRecord.kafkaPartition(), + changeRecord.keySchema(), + changeRecord.key(), + changeRecord.valueSchema(), + envelope.read(after, source, fetchTs)); + } + @Override public List formatMessageTimestamp(Collection snapshotRecords) { return snapshotRecords.stream() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java index 94be44aa653..7a0f6eb1943 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.table.types.logical.RowType; +import io.debezium.data.Envelope; import io.debezium.util.SchemaNameAdjuster; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -237,7 +238,29 @@ public static Object[] getSplitKey( RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) { // the split key field contains single field now String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0)); - Struct key = (Struct) dataRecord.key(); - return new Object[] {key.get(splitFieldName)}; + Struct target; + if (dataRecord.key() != null) { + target = (Struct) dataRecord.key(); + } else { + // For tables without primary key, extract chunk key from value struct + target = getStructContainingChunkKey(dataRecord); + } + return new Object[] {target.get(splitFieldName)}; + } + + /** + * For tables without primary key, the chunk key is not in the record key (which is null). + * Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for + * UPDATE/DELETE). + */ + public static Struct getStructContainingChunkKey(SourceRecord record) { + Struct value = (Struct) record.value(); + Envelope.Operation op = + Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + return value.getStruct(Envelope.FieldName.AFTER); + } else { + return value.getStruct(Envelope.FieldName.BEFORE); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 4743247899a..6cd4a23ae38 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -44,10 +44,13 @@ import io.debezium.connector.postgresql.connection.PostgresConnectionUtils; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.TopicSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -64,6 +67,7 @@ /** The dialect for Postgres. */ public class PostgresDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PostgresDialect.class); private static final String CONNECTION_NAME = "postgres-cdc-connector"; private final PostgresSourceConfig sourceConfig; @@ -196,6 +200,8 @@ public Map discoverDataCollectionSchemas(JdbcSourceConfig try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { // fetch table schemas Map tableSchemas = queryTableSchema(jdbc, capturedTableIds); + // validate REPLICA IDENTITY for tables without primary key + validateReplicaIdentityForNoPkTables(jdbc, tableSchemas); return tableSchemas; } catch (Exception e) { throw new FlinkRuntimeException( @@ -203,6 +209,55 @@ public Map discoverDataCollectionSchemas(JdbcSourceConfig } } + /** + * Validates that tables without primary key have REPLICA IDENTITY FULL set. This is required + * for PostgreSQL to include the full before image in WAL for UPDATE and DELETE events. + */ + private void validateReplicaIdentityForNoPkTables( + JdbcConnection jdbc, Map tableSchemas) throws SQLException { + for (Map.Entry entry : tableSchemas.entrySet()) { + TableId tableId = entry.getKey(); + Table table = entry.getValue().getTable(); + if (table.primaryKeyColumnNames().isEmpty()) { + String replicaIdentity = queryReplicaIdentity(jdbc, tableId); + if (!"f".equalsIgnoreCase(replicaIdentity)) { + throw new FlinkRuntimeException( + String.format( + "Table '%s.%s' has no primary key. " + + "To use incremental snapshot for tables without primary key, " + + "REPLICA IDENTITY FULL must be set. " + + "Please execute: ALTER TABLE %s.%s REPLICA IDENTITY FULL", + tableId.schema(), + tableId.table(), + tableId.schema(), + tableId.table())); + } + LOG.info( + "Table '{}.{}' has no primary key but has REPLICA IDENTITY FULL set.", + tableId.schema(), + tableId.table()); + } + } + } + + private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException { + String query = + String.format( + "SELECT relreplident FROM pg_class c " + + "JOIN pg_namespace n ON c.relnamespace = n.oid " + + "WHERE n.nspname = '%s' AND c.relname = '%s'", + tableId.schema(), tableId.table()); + final String[] result = new String[1]; + jdbc.query( + query, + rs -> { + if (rs.next()) { + result[0] = rs.getString(1); + } + }); + return result[0]; + } + @Override public JdbcConnectionPoolFactory getPooledDataSourceFactory() { return new PostgresConnectionPoolFactory(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index c3f117076fc..810425948b2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -1160,4 +1160,146 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except tableResult.getJobClient().get().cancel().get(); RowUtils.USE_LEGACY_TO_STRING = true; } + + @Test + void testNoPKTableWithChunkKey() + throws SQLException, ExecutionException, InterruptedException { + setup(true); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + String sourceDDL = + String.format( + "CREATE TABLE no_pk_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'scan.incremental.snapshot.chunk.key-column' = 'id'," + + " 'scan.incremental.snapshot.chunk.size' = '4'," + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products_no_pk", + getSlotName()); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM no_pk_source GROUP BY name"); + + waitForSnapshotStarted("sink"); + + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE inventory.products_no_pk SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE inventory.products_no_pk SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); + statement.execute( + "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); + statement.execute( + "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE inventory.products_no_pk SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM inventory.products_no_pk WHERE id=111;"); + } + + String[] expected = + new String[] { + "scooter,3.140", + "car battery,8.100", + "12-pack drill bits,0.800", + "hammer,2.625", + "rocks,5.100", + "jacket,0.600", + "spare tire,22.200" + }; + waitForSinkResult("sink", Arrays.asList(expected)); + + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); + Assertions.assertThat(actual).containsExactlyInAnyOrder(expected); + + result.getJobClient().get().cancel().get(); + } + + @Test + void testNoPKTableWithoutChunkKey() + throws SQLException, ExecutionException, InterruptedException { + setup(true); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + String sourceDDL = + String.format( + "CREATE TABLE no_pk_source_fail (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products_no_pk", + getSlotName()); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM no_pk_source_fail"); + + Assertions.assertThatThrownBy(result::await) + .hasStackTraceContaining( + "scan.incremental.snapshot.chunk.key-column"); + + result.getJobClient().ifPresent(client -> { + try { + client.cancel().get(); + } catch (Exception e) { + // ignore + } + }); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql index 9cf06bf5560..4a5d2475cd2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql @@ -37,4 +37,24 @@ VALUES (default,'scooter','Small 2-wheel scooter',3.14), (default,'hammer','16oz carpenter''s hammer',1.0), (default,'rocks','box of assorted rocks',5.3), (default,'jacket','water resistent black wind breaker',0.1), - (default,'spare tire','24 inch spare tire',22.2); \ No newline at end of file + (default,'spare tire','24 inch spare tire',22.2); + +-- Create table without primary key for no-PK table tests +CREATE TABLE products_no_pk ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products_no_pk REPLICA IDENTITY FULL; + +INSERT INTO products_no_pk +VALUES (101,'scooter','Small 2-wheel scooter',3.14), + (102,'car battery','12V car battery',8.1), + (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + (104,'hammer','12oz carpenter''s hammer',0.75), + (105,'hammer','14oz carpenter''s hammer',0.875), + (106,'hammer','16oz carpenter''s hammer',1.0), + (107,'rocks','box of assorted rocks',5.3), + (108,'jacket','water resistent black wind breaker',0.1), + (109,'spare tire','24 inch spare tire',22.2); \ No newline at end of file From 55170b16e4a8c0c1d05f7748dd22dd0894e6ff4f Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 8 Apr 2026 11:30:30 +0800 Subject: [PATCH 2/7] code style --- .../source/reader/external/JdbcSourceFetchTaskContext.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index e89d7357395..0109c923121 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -105,8 +105,7 @@ public void rewriteOutputBuffer( case CREATE: if (hasPrimaryKey) { outputBuffer.put( - (Struct) changeRecord.key(), - buildReadRecord(changeRecord)); + (Struct) changeRecord.key(), buildReadRecord(changeRecord)); } else { Struct after = value.getStruct(Envelope.FieldName.AFTER); outputBuffer.put(after, buildReadRecord(changeRecord)); @@ -115,8 +114,7 @@ public void rewriteOutputBuffer( case UPDATE: if (hasPrimaryKey) { outputBuffer.put( - (Struct) changeRecord.key(), - buildReadRecord(changeRecord)); + (Struct) changeRecord.key(), buildReadRecord(changeRecord)); } else { // For no-PK table: remove the before image, insert the after image Struct before = value.getStruct(Envelope.FieldName.BEFORE); From 24a46fd4fc9aa519121c05d1e6d8b8dcca896eab Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 8 Apr 2026 12:10:35 +0800 Subject: [PATCH 3/7] code style --- .../table/PostgreSQLConnectorITCase.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 810425948b2..8c064c16fae 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -1162,8 +1162,7 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except } @Test - void testNoPKTableWithChunkKey() - throws SQLException, ExecutionException, InterruptedException { + void testNoPKTableWithChunkKey() throws SQLException, ExecutionException, InterruptedException { setup(true); initializePostgresTable(POSTGRES_CONTAINER, "inventory"); String sourceDDL = @@ -1291,15 +1290,16 @@ void testNoPKTableWithoutChunkKey() TableResult result = tEnv.executeSql("SELECT * FROM no_pk_source_fail"); Assertions.assertThatThrownBy(result::await) - .hasStackTraceContaining( - "scan.incremental.snapshot.chunk.key-column"); - - result.getJobClient().ifPresent(client -> { - try { - client.cancel().get(); - } catch (Exception e) { - // ignore - } - }); + .hasStackTraceContaining("scan.incremental.snapshot.chunk.key-column"); + + result.getJobClient() + .ifPresent( + client -> { + try { + client.cancel().get(); + } catch (Exception e) { + // ignore + } + }); } } From 6f34a1ffcaff6dc3fb3c202e598e37e295807e4c Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Wed, 8 Apr 2026 12:41:55 +0800 Subject: [PATCH 4/7] fix --- .../table/PostgreSQLConnectorITCase.java | 2 ++ .../src/test/resources/ddl/inventory.sql | 22 +----------- .../test/resources/ddl/inventory_no_pk.sql | 36 +++++++++++++++++++ 3 files changed, 39 insertions(+), 21 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 8c064c16fae..15684552cb7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -1165,6 +1165,7 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except void testNoPKTableWithChunkKey() throws SQLException, ExecutionException, InterruptedException { setup(true); initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory_no_pk"); String sourceDDL = String.format( "CREATE TABLE no_pk_source (" @@ -1256,6 +1257,7 @@ void testNoPKTableWithoutChunkKey() throws SQLException, ExecutionException, InterruptedException { setup(true); initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory_no_pk"); String sourceDDL = String.format( "CREATE TABLE no_pk_source_fail (" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql index 4a5d2475cd2..9cf06bf5560 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory.sql @@ -37,24 +37,4 @@ VALUES (default,'scooter','Small 2-wheel scooter',3.14), (default,'hammer','16oz carpenter''s hammer',1.0), (default,'rocks','box of assorted rocks',5.3), (default,'jacket','water resistent black wind breaker',0.1), - (default,'spare tire','24 inch spare tire',22.2); - --- Create table without primary key for no-PK table tests -CREATE TABLE products_no_pk ( - id INTEGER NOT NULL, - name VARCHAR(255) NOT NULL DEFAULT 'flink', - description VARCHAR(512), - weight FLOAT -); -ALTER TABLE products_no_pk REPLICA IDENTITY FULL; - -INSERT INTO products_no_pk -VALUES (101,'scooter','Small 2-wheel scooter',3.14), - (102,'car battery','12V car battery',8.1), - (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), - (104,'hammer','12oz carpenter''s hammer',0.75), - (105,'hammer','14oz carpenter''s hammer',0.875), - (106,'hammer','16oz carpenter''s hammer',1.0), - (107,'rocks','box of assorted rocks',5.3), - (108,'jacket','water resistent black wind breaker',0.1), - (109,'spare tire','24 inch spare tire',22.2); \ No newline at end of file + (default,'spare tire','24 inch spare tire',22.2); \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql new file mode 100644 index 00000000000..5aedb559bf1 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SET search_path TO inventory; + +-- Create table without primary key for no-PK table tests +CREATE TABLE products_no_pk ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products_no_pk REPLICA IDENTITY FULL; + +INSERT INTO products_no_pk +VALUES (101,'scooter','Small 2-wheel scooter',3.14), + (102,'car battery','12V car battery',8.1), + (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + (104,'hammer','12oz carpenter''s hammer',0.75), + (105,'hammer','14oz carpenter''s hammer',0.875), + (106,'hammer','16oz carpenter''s hammer',1.0), + (107,'rocks','box of assorted rocks',5.3), + (108,'jacket','water resistent black wind breaker',0.1), + (109,'spare tire','24 inch spare tire',22.2); From 50ff23127210936de6d0c310600c3d324691e90b Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 9 Apr 2026 20:43:51 +0800 Subject: [PATCH 5/7] fix --- .../external/JdbcSourceFetchTaskContext.java | 5 +-- .../connectors/base/utils/SplitKeyUtils.java | 2 +- .../postgres/source/PostgresDialect.java | 20 +++++++--- .../table/PostgreSQLConnectorITCase.java | 38 +++++++++---------- .../test/resources/ddl/inventory_no_pk.sql | 2 +- 5 files changed, 36 insertions(+), 31 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index 0109c923121..e378ea69e56 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -38,8 +38,6 @@ import io.debezium.util.SchemaNameAdjuster; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.Instant; import java.util.Collection; @@ -51,7 +49,6 @@ @Internal public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { - private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class); protected final JdbcSourceConfig sourceConfig; protected final JdbcDataSourceDialect dataSourceDialect; @@ -138,7 +135,7 @@ public void rewriteOutputBuffer( case READ: throw new IllegalStateException( String.format( - "Data change record shouldn't use READ operation, the the record is %s.", + "Data change record shouldn't use READ operation, the record is %s.", changeRecord)); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java index 7a0f6eb1943..9e7d2b2d623 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java @@ -253,7 +253,7 @@ public static Object[] getSplitKey( * Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for * UPDATE/DELETE). */ - public static Struct getStructContainingChunkKey(SourceRecord record) { + static Struct getStructContainingChunkKey(SourceRecord record) { Struct value = (Struct) record.value(); Envelope.Operation op = Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 6cd4a23ae38..9b7e068f2ef 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -242,19 +242,27 @@ private void validateReplicaIdentityForNoPkTables( private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException { String query = - String.format( - "SELECT relreplident FROM pg_class c " - + "JOIN pg_namespace n ON c.relnamespace = n.oid " - + "WHERE n.nspname = '%s' AND c.relname = '%s'", - tableId.schema(), tableId.table()); + "SELECT relreplident FROM pg_class c " + + "JOIN pg_namespace n ON c.relnamespace = n.oid " + + "WHERE n.nspname = ? AND c.relname = ?"; final String[] result = new String[1]; - jdbc.query( + jdbc.prepareQuery( query, + ps -> { + ps.setString(1, tableId.schema()); + ps.setString(2, tableId.table()); + }, rs -> { if (rs.next()) { result[0] = rs.getString(1); } }); + if (result[0] == null) { + throw new FlinkRuntimeException( + String.format( + "Failed to query replica identity for table '%s.%s': table was not found.", + tableId.schema(), tableId.table())); + } return result[0]; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index 15684552cb7..ebeafb4a959 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -177,11 +177,11 @@ void testConsumingAllEvents(boolean parallelismSnapshot) "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -221,7 +221,7 @@ void testConsumingAllEvents(boolean parallelismSnapshot) * | 107 | rocks | box of assorted rocks | 5.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 | * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * | 110 | jacket | new water resistant white wind breaker | 0.5 | * +-----+--------------------+------------------------------------------------- * --------+--------+ * @@ -317,7 +317,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( - "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistent white wind breaker',0.2, 'us');"); // 110 + "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistant white wind breaker',0.2, 'us');"); // 110 statement.execute( "INSERT INTO inventory_partitioned.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, 'uk');"); statement.execute( @@ -340,7 +340,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) "107,rocks,box of assorted rocks,5.300,uk", "108,jacket,water resistent black wind breaker,0.100,uk", "109,spare tire,24 inch spare tire,22.200,uk", - "110,jacket,water resistent white wind breaker,0.200,us", + "110,jacket,water resistant white wind breaker,0.200,us", "111,scooter,Big 2-wheel scooter ,5.180,uk", "112,bike,Big 2-wheel bycicle ,6.180,china" }; @@ -407,11 +407,11 @@ void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception { try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -419,7 +419,7 @@ void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception { // Note: We use waitForSinkResult instead of waitForSinkSize because Flink 2.x // optimizations may reduce the number of intermediate changelog messages String[] expected = - new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + new String[] {"110,jacket,new water resistant white wind breaker,0.500"}; waitForSinkResult("sink", Arrays.asList(expected)); @@ -589,11 +589,11 @@ void testExceptionForReplicaIdentity(boolean parallelismSnapshot) throws Excepti "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -787,11 +787,11 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -831,7 +831,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { + ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)", "+I(" + databaseName - + ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)", + + ",inventory,products,+I,110,jacket,water resistant white wind breaker,0.200)", "+I(" + databaseName + ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)", @@ -843,7 +843,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { + ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)", "+U(" + databaseName - + ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)", + + ",inventory,products,+U,110,jacket,new water resistant white wind breaker,0.500)", "+U(" + databaseName + ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)", @@ -921,11 +921,11 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception { "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -965,7 +965,7 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception { * | 107 | rocks | box of assorted rocks | 5.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 | * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * | 110 | jacket | new water resistant white wind breaker | 0.5 | * +-----+--------------------+------------------------------------------------- * --------+--------+ * @@ -1225,11 +1225,11 @@ void testNoPKTableWithChunkKey() throws SQLException, ExecutionException, Interr "UPDATE inventory.products_no_pk SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products_no_pk SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistent white wind breaker',0.2);"); + "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistant white wind breaker',0.2);"); statement.execute( "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); statement.execute( - "UPDATE inventory.products_no_pk SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products_no_pk SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products_no_pk SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products_no_pk WHERE id=111;"); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql index 5aedb559bf1..b79ccfb29a5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql @@ -32,5 +32,5 @@ VALUES (101,'scooter','Small 2-wheel scooter',3.14), (105,'hammer','14oz carpenter''s hammer',0.875), (106,'hammer','16oz carpenter''s hammer',1.0), (107,'rocks','box of assorted rocks',5.3), - (108,'jacket','water resistent black wind breaker',0.1), + (108,'jacket','water resistant black wind breaker',0.1), (109,'spare tire','24 inch spare tire',22.2); From e02ce7d4df6fe8b600163a741f53e47d5fbb2388 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 9 Apr 2026 21:04:36 +0800 Subject: [PATCH 6/7] fix --- .../base/source/reader/external/JdbcSourceFetchTaskContext.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index e378ea69e56..28e5d5aedba 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -49,7 +49,6 @@ @Internal public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { - protected final JdbcSourceConfig sourceConfig; protected final JdbcDataSourceDialect dataSourceDialect; protected CommonConnectorConfig dbzConnectorConfig; From 4188eea45c8a6f7eb0a5a1b3d6b757c3ff99df2e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 9 Apr 2026 21:22:48 +0800 Subject: [PATCH 7/7] fix --- .../flink/cdc/connectors/postgres/source/PostgresDialect.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 9b7e068f2ef..5f61b6faa70 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -224,8 +224,8 @@ private void validateReplicaIdentityForNoPkTables( throw new FlinkRuntimeException( String.format( "Table '%s.%s' has no primary key. " - + "To use incremental snapshot for tables without primary key, " - + "REPLICA IDENTITY FULL must be set. " + + "REPLICA IDENTITY FULL must be set for tables without primary key, " + + "otherwise UPDATE and DELETE events will not be captured. " + "Please execute: ALTER TABLE %s.%s REPLICA IDENTITY FULL", tableId.schema(), tableId.table(),