diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java index 3136bb97d25..19ebbaf900b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -26,6 +26,7 @@ import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder; import io.debezium.connector.base.ChangeEventQueue; +import io.debezium.data.Envelope; import io.debezium.pipeline.DataChangeEvent; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -177,7 +178,14 @@ public Iterator pollWithBuffer() throws InterruptedException { } if (!reachChangeLogStart) { - outputBuffer.put((Struct) record.key(), record); + if (record.key() != null) { + outputBuffer.put((Struct) record.key(), record); + } else { + // For tables without primary key, use after struct as buffer key + Struct value = (Struct) record.value(); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + outputBuffer.put(after, record); + } } else { if (isChangeRecordInChunkRange(record)) { // rewrite overlapping snapshot records through the record key diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java index 9975331e469..28e5d5aedba 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java @@ -92,43 +92,71 @@ public Object[] getSplitKey(SourceRecord record) { @Override public void rewriteOutputBuffer( Map outputBuffer, SourceRecord changeRecord) { - Struct key = (Struct) changeRecord.key(); + boolean hasPrimaryKey = changeRecord.key() != null; Struct value = (Struct) changeRecord.value(); if (value != null) { Envelope.Operation operation = Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); switch (operation) { case CREATE: + if (hasPrimaryKey) { + outputBuffer.put( + (Struct) changeRecord.key(), buildReadRecord(changeRecord)); + } else { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + outputBuffer.put(after, buildReadRecord(changeRecord)); + } + break; case UPDATE: - Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema()); - Struct source = value.getStruct(Envelope.FieldName.SOURCE); - Struct after = value.getStruct(Envelope.FieldName.AFTER); - Instant fetchTs = - Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP)); - SourceRecord record = - new SourceRecord( - changeRecord.sourcePartition(), - changeRecord.sourceOffset(), - changeRecord.topic(), - changeRecord.kafkaPartition(), - changeRecord.keySchema(), - changeRecord.key(), - changeRecord.valueSchema(), - envelope.read(after, source, fetchTs)); - outputBuffer.put(key, record); + if (hasPrimaryKey) { + outputBuffer.put( + (Struct) changeRecord.key(), buildReadRecord(changeRecord)); + } else { + // For no-PK table: remove the before image, insert the after image + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + if (before != null) { + outputBuffer.remove(before); + } + outputBuffer.put(after, buildReadRecord(changeRecord)); + } break; case DELETE: - outputBuffer.remove(key); + if (hasPrimaryKey) { + outputBuffer.remove((Struct) changeRecord.key()); + } else { + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + if (before != null) { + outputBuffer.remove(before); + } + } break; case READ: throw new IllegalStateException( String.format( - "Data change record shouldn't use READ operation, the the record is %s.", + "Data change record shouldn't use READ operation, the record is %s.", changeRecord)); } } } + private SourceRecord buildReadRecord(SourceRecord changeRecord) { + Struct value = (Struct) changeRecord.value(); + Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema()); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + Instant fetchTs = Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP)); + return new SourceRecord( + changeRecord.sourcePartition(), + changeRecord.sourceOffset(), + changeRecord.topic(), + changeRecord.kafkaPartition(), + changeRecord.keySchema(), + changeRecord.key(), + changeRecord.valueSchema(), + envelope.read(after, source, fetchTs)); + } + @Override public List formatMessageTimestamp(Collection snapshotRecords) { return snapshotRecords.stream() diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java index 94be44aa653..9e7d2b2d623 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo; import org.apache.flink.table.types.logical.RowType; +import io.debezium.data.Envelope; import io.debezium.util.SchemaNameAdjuster; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -237,7 +238,29 @@ public static Object[] getSplitKey( RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) { // the split key field contains single field now String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0)); - Struct key = (Struct) dataRecord.key(); - return new Object[] {key.get(splitFieldName)}; + Struct target; + if (dataRecord.key() != null) { + target = (Struct) dataRecord.key(); + } else { + // For tables without primary key, extract chunk key from value struct + target = getStructContainingChunkKey(dataRecord); + } + return new Object[] {target.get(splitFieldName)}; + } + + /** + * For tables without primary key, the chunk key is not in the record key (which is null). + * Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for + * UPDATE/DELETE). + */ + static Struct getStructContainingChunkKey(SourceRecord record) { + Struct value = (Struct) record.value(); + Envelope.Operation op = + Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + return value.getStruct(Envelope.FieldName.AFTER); + } else { + return value.getStruct(Envelope.FieldName.BEFORE); + } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java index 4743247899a..5f61b6faa70 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java @@ -44,10 +44,13 @@ import io.debezium.connector.postgresql.connection.PostgresConnectionUtils; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.TopicSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -64,6 +67,7 @@ /** The dialect for Postgres. */ public class PostgresDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PostgresDialect.class); private static final String CONNECTION_NAME = "postgres-cdc-connector"; private final PostgresSourceConfig sourceConfig; @@ -196,6 +200,8 @@ public Map discoverDataCollectionSchemas(JdbcSourceConfig try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) { // fetch table schemas Map tableSchemas = queryTableSchema(jdbc, capturedTableIds); + // validate REPLICA IDENTITY for tables without primary key + validateReplicaIdentityForNoPkTables(jdbc, tableSchemas); return tableSchemas; } catch (Exception e) { throw new FlinkRuntimeException( @@ -203,6 +209,63 @@ public Map discoverDataCollectionSchemas(JdbcSourceConfig } } + /** + * Validates that tables without primary key have REPLICA IDENTITY FULL set. This is required + * for PostgreSQL to include the full before image in WAL for UPDATE and DELETE events. + */ + private void validateReplicaIdentityForNoPkTables( + JdbcConnection jdbc, Map tableSchemas) throws SQLException { + for (Map.Entry entry : tableSchemas.entrySet()) { + TableId tableId = entry.getKey(); + Table table = entry.getValue().getTable(); + if (table.primaryKeyColumnNames().isEmpty()) { + String replicaIdentity = queryReplicaIdentity(jdbc, tableId); + if (!"f".equalsIgnoreCase(replicaIdentity)) { + throw new FlinkRuntimeException( + String.format( + "Table '%s.%s' has no primary key. " + + "REPLICA IDENTITY FULL must be set for tables without primary key, " + + "otherwise UPDATE and DELETE events will not be captured. " + + "Please execute: ALTER TABLE %s.%s REPLICA IDENTITY FULL", + tableId.schema(), + tableId.table(), + tableId.schema(), + tableId.table())); + } + LOG.info( + "Table '{}.{}' has no primary key but has REPLICA IDENTITY FULL set.", + tableId.schema(), + tableId.table()); + } + } + } + + private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException { + String query = + "SELECT relreplident FROM pg_class c " + + "JOIN pg_namespace n ON c.relnamespace = n.oid " + + "WHERE n.nspname = ? AND c.relname = ?"; + final String[] result = new String[1]; + jdbc.prepareQuery( + query, + ps -> { + ps.setString(1, tableId.schema()); + ps.setString(2, tableId.table()); + }, + rs -> { + if (rs.next()) { + result[0] = rs.getString(1); + } + }); + if (result[0] == null) { + throw new FlinkRuntimeException( + String.format( + "Failed to query replica identity for table '%s.%s': table was not found.", + tableId.schema(), tableId.table())); + } + return result[0]; + } + @Override public JdbcConnectionPoolFactory getPooledDataSourceFactory() { return new PostgresConnectionPoolFactory(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java index c3f117076fc..ebeafb4a959 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java @@ -177,11 +177,11 @@ void testConsumingAllEvents(boolean parallelismSnapshot) "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -221,7 +221,7 @@ void testConsumingAllEvents(boolean parallelismSnapshot) * | 107 | rocks | box of assorted rocks | 5.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 | * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * | 110 | jacket | new water resistant white wind breaker | 0.5 | * +-----+--------------------+------------------------------------------------- * --------+--------+ * @@ -317,7 +317,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( - "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistent white wind breaker',0.2, 'us');"); // 110 + "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistant white wind breaker',0.2, 'us');"); // 110 statement.execute( "INSERT INTO inventory_partitioned.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, 'uk');"); statement.execute( @@ -340,7 +340,7 @@ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot) "107,rocks,box of assorted rocks,5.300,uk", "108,jacket,water resistent black wind breaker,0.100,uk", "109,spare tire,24 inch spare tire,22.200,uk", - "110,jacket,water resistent white wind breaker,0.200,us", + "110,jacket,water resistant white wind breaker,0.200,us", "111,scooter,Big 2-wheel scooter ,5.180,uk", "112,bike,Big 2-wheel bycicle ,6.180,china" }; @@ -407,11 +407,11 @@ void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception { try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); Statement statement = connection.createStatement()) { statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -419,7 +419,7 @@ void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception { // Note: We use waitForSinkResult instead of waitForSinkSize because Flink 2.x // optimizations may reduce the number of intermediate changelog messages String[] expected = - new String[] {"110,jacket,new water resistent white wind breaker,0.500"}; + new String[] {"110,jacket,new water resistant white wind breaker,0.500"}; waitForSinkResult("sink", Arrays.asList(expected)); @@ -589,11 +589,11 @@ void testExceptionForReplicaIdentity(boolean parallelismSnapshot) throws Excepti "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -787,11 +787,11 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -831,7 +831,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { + ",inventory,products,+I,109,spare tire,24 inch spare tire,22.200)", "+I(" + databaseName - + ",inventory,products,+I,110,jacket,water resistent white wind breaker,0.200)", + + ",inventory,products,+I,110,jacket,water resistant white wind breaker,0.200)", "+I(" + databaseName + ",inventory,products,+I,111,scooter,Big 2-wheel scooter ,5.180)", @@ -843,7 +843,7 @@ void testMetadataColumns(boolean parallelismSnapshot) throws Throwable { + ",inventory,products,+U,107,rocks,box of assorted rocks,5.100)", "+U(" + databaseName - + ",inventory,products,+U,110,jacket,new water resistent white wind breaker,0.500)", + + ",inventory,products,+U,110,jacket,new water resistant white wind breaker,0.500)", "+U(" + databaseName + ",inventory,products,+U,111,scooter,Big 2-wheel scooter ,5.170)", @@ -921,11 +921,11 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception { "UPDATE inventory.products SET description='18oz carpenter hammer' WHERE id=106;"); statement.execute("UPDATE inventory.products SET weight='5.1' WHERE id=107;"); statement.execute( - "INSERT INTO inventory.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + "INSERT INTO inventory.products VALUES (default,'jacket','water resistant white wind breaker',0.2);"); // 110 statement.execute( "INSERT INTO inventory.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); statement.execute( - "UPDATE inventory.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + "UPDATE inventory.products SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); statement.execute("UPDATE inventory.products SET weight='5.17' WHERE id=111;"); statement.execute("DELETE FROM inventory.products WHERE id=111;"); } @@ -965,7 +965,7 @@ void testUpsertMode(boolean parallelismSnapshot) throws Exception { * | 107 | rocks | box of assorted rocks | 5.1 | * | 108 | jacket | water resistent black wind breaker | 0.1 | * | 109 | spare tire | 24 inch spare tire | 22.2 | - * | 110 | jacket | new water resistent white wind breaker | 0.5 | + * | 110 | jacket | new water resistant white wind breaker | 0.5 | * +-----+--------------------+------------------------------------------------- * --------+--------+ * @@ -1160,4 +1160,148 @@ void testUniqueIndexIncludingFunction(boolean parallelismSnapshot) throws Except tableResult.getJobClient().get().cancel().get(); RowUtils.USE_LEGACY_TO_STRING = true; } + + @Test + void testNoPKTableWithChunkKey() throws SQLException, ExecutionException, InterruptedException { + setup(true); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory_no_pk"); + String sourceDDL = + String.format( + "CREATE TABLE no_pk_source (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'scan.incremental.snapshot.chunk.key-column' = 'id'," + + " 'scan.incremental.snapshot.chunk.size' = '4'," + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products_no_pk", + getSlotName()); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM no_pk_source GROUP BY name"); + + waitForSnapshotStarted("sink"); + + // wait a bit to make sure the replication slot is ready + Thread.sleep(5000); + + // generate WAL + try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER); + Statement statement = connection.createStatement()) { + statement.execute( + "UPDATE inventory.products_no_pk SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE inventory.products_no_pk SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO inventory.products_no_pk VALUES (110,'jacket','water resistant white wind breaker',0.2);"); + statement.execute( + "INSERT INTO inventory.products_no_pk VALUES (111,'scooter','Big 2-wheel scooter',5.18);"); + statement.execute( + "UPDATE inventory.products_no_pk SET description='new water resistant white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE inventory.products_no_pk SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM inventory.products_no_pk WHERE id=111;"); + } + + String[] expected = + new String[] { + "scooter,3.140", + "car battery,8.100", + "12-pack drill bits,0.800", + "hammer,2.625", + "rocks,5.100", + "jacket,0.600", + "spare tire,22.200" + }; + waitForSinkResult("sink", Arrays.asList(expected)); + + List actual = TestValuesTableFactory.getResultsAsStrings("sink"); + Assertions.assertThat(actual).containsExactlyInAnyOrder(expected); + + result.getJobClient().get().cancel().get(); + } + + @Test + void testNoPKTableWithoutChunkKey() + throws SQLException, ExecutionException, InterruptedException { + setup(true); + initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + initializePostgresTable(POSTGRES_CONTAINER, "inventory_no_pk"); + String sourceDDL = + String.format( + "CREATE TABLE no_pk_source_fail (" + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)" + + ") WITH (" + + " 'connector' = 'postgres-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'schema-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'decoding.plugin.name' = 'pgoutput', " + + " 'slot.name' = '%s'" + + ")", + POSTGRES_CONTAINER.getHost(), + POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT), + POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword(), + POSTGRES_CONTAINER.getDatabaseName(), + "inventory", + "products_no_pk", + getSlotName()); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM no_pk_source_fail"); + + Assertions.assertThatThrownBy(result::await) + .hasStackTraceContaining("scan.incremental.snapshot.chunk.key-column"); + + result.getJobClient() + .ifPresent( + client -> { + try { + client.cancel().get(); + } catch (Exception e) { + // ignore + } + }); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql new file mode 100644 index 00000000000..b79ccfb29a5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql @@ -0,0 +1,36 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +SET search_path TO inventory; + +-- Create table without primary key for no-PK table tests +CREATE TABLE products_no_pk ( + id INTEGER NOT NULL, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products_no_pk REPLICA IDENTITY FULL; + +INSERT INTO products_no_pk +VALUES (101,'scooter','Small 2-wheel scooter',3.14), + (102,'car battery','12V car battery',8.1), + (103,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8), + (104,'hammer','12oz carpenter''s hammer',0.75), + (105,'hammer','14oz carpenter''s hammer',0.875), + (106,'hammer','16oz carpenter''s hammer',1.0), + (107,'rocks','box of assorted rocks',5.3), + (108,'jacket','water resistant black wind breaker',0.1), + (109,'spare tire','24 inch spare tire',22.2);