Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.pipeline.DataChangeEvent;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -177,7 +178,14 @@ public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
}

if (!reachChangeLogStart) {
outputBuffer.put((Struct) record.key(), record);
if (record.key() != null) {
outputBuffer.put((Struct) record.key(), record);
} else {
// For tables without primary key, use after struct as buffer key
Struct value = (Struct) record.value();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
outputBuffer.put(after, record);
}
} else {
if (isChangeRecordInChunkRange(record)) {
// rewrite overlapping snapshot records through the record key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,43 +92,71 @@ public Object[] getSplitKey(SourceRecord record) {
@Override
public void rewriteOutputBuffer(
Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
Struct key = (Struct) changeRecord.key();
boolean hasPrimaryKey = changeRecord.key() != null;
Struct value = (Struct) changeRecord.value();
if (value != null) {
Envelope.Operation operation =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
switch (operation) {
case CREATE:
if (hasPrimaryKey) {
outputBuffer.put(
(Struct) changeRecord.key(), buildReadRecord(changeRecord));
} else {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
outputBuffer.put(after, buildReadRecord(changeRecord));
}
break;
case UPDATE:
Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs =
Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));
SourceRecord record =
new SourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
changeRecord.valueSchema(),
envelope.read(after, source, fetchTs));
outputBuffer.put(key, record);
if (hasPrimaryKey) {
outputBuffer.put(
(Struct) changeRecord.key(), buildReadRecord(changeRecord));
} else {
// For no-PK table: remove the before image, insert the after image
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
if (before != null) {
outputBuffer.remove(before);
}
outputBuffer.put(after, buildReadRecord(changeRecord));
}
break;
case DELETE:
outputBuffer.remove(key);
if (hasPrimaryKey) {
outputBuffer.remove((Struct) changeRecord.key());
} else {
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
if (before != null) {
outputBuffer.remove(before);
}
}
break;
case READ:
throw new IllegalStateException(
String.format(
"Data change record shouldn't use READ operation, the the record is %s.",
"Data change record shouldn't use READ operation, the record is %s.",
changeRecord));
}
}
}

private SourceRecord buildReadRecord(SourceRecord changeRecord) {
Struct value = (Struct) changeRecord.value();
Envelope envelope = Envelope.fromSchema(changeRecord.valueSchema());
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Instant fetchTs = Instant.ofEpochMilli((Long) source.get(Envelope.FieldName.TIMESTAMP));
return new SourceRecord(
changeRecord.sourcePartition(),
changeRecord.sourceOffset(),
changeRecord.topic(),
changeRecord.kafkaPartition(),
changeRecord.keySchema(),
changeRecord.key(),
changeRecord.valueSchema(),
envelope.read(after, source, fetchTs));
}

@Override
public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
return snapshotRecords.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.table.types.logical.RowType;

import io.debezium.data.Envelope;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -237,7 +238,29 @@ public static Object[] getSplitKey(
RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) {
// the split key field contains single field now
String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
Struct key = (Struct) dataRecord.key();
return new Object[] {key.get(splitFieldName)};
Struct target;
if (dataRecord.key() != null) {
target = (Struct) dataRecord.key();
} else {
// For tables without primary key, extract chunk key from value struct
target = getStructContainingChunkKey(dataRecord);
}
return new Object[] {target.get(splitFieldName)};
}

/**
* For tables without primary key, the chunk key is not in the record key (which is null).
* Instead, extract it from the value's after struct (for CREATE/READ) or before struct (for
* UPDATE/DELETE).
*/
static Struct getStructContainingChunkKey(SourceRecord record) {
Struct value = (Struct) record.value();
Envelope.Operation op =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
return value.getStruct(Envelope.FieldName.AFTER);
} else {
return value.getStruct(Envelope.FieldName.BEFORE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@
import io.debezium.connector.postgresql.connection.PostgresConnectionUtils;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.TopicSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand All @@ -64,6 +67,7 @@
/** The dialect for Postgres. */
public class PostgresDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PostgresDialect.class);
private static final String CONNECTION_NAME = "postgres-cdc-connector";

private final PostgresSourceConfig sourceConfig;
Expand Down Expand Up @@ -196,13 +200,72 @@ public Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
// fetch table schemas
Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, capturedTableIds);
// validate REPLICA IDENTITY for tables without primary key
validateReplicaIdentityForNoPkTables(jdbc, tableSchemas);
return tableSchemas;
} catch (Exception e) {
throw new FlinkRuntimeException(
"Error to discover table schemas: " + e.getMessage(), e);
}
}

/**
* Validates that tables without primary key have REPLICA IDENTITY FULL set. This is required
* for PostgreSQL to include the full before image in WAL for UPDATE and DELETE events.
*/
private void validateReplicaIdentityForNoPkTables(
JdbcConnection jdbc, Map<TableId, TableChange> tableSchemas) throws SQLException {
for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
TableId tableId = entry.getKey();
Table table = entry.getValue().getTable();
if (table.primaryKeyColumnNames().isEmpty()) {
String replicaIdentity = queryReplicaIdentity(jdbc, tableId);
if (!"f".equalsIgnoreCase(replicaIdentity)) {
throw new FlinkRuntimeException(
String.format(
"Table '%s.%s' has no primary key. "
+ "REPLICA IDENTITY FULL must be set for tables without primary key, "
+ "otherwise UPDATE and DELETE events will not be captured. "
+ "Please execute: ALTER TABLE %s.%s REPLICA IDENTITY FULL",
tableId.schema(),
tableId.table(),
tableId.schema(),
tableId.table()));
}
LOG.info(
"Table '{}.{}' has no primary key but has REPLICA IDENTITY FULL set.",
tableId.schema(),
tableId.table());
}
}
}

private String queryReplicaIdentity(JdbcConnection jdbc, TableId tableId) throws SQLException {
String query =
"SELECT relreplident FROM pg_class c "
+ "JOIN pg_namespace n ON c.relnamespace = n.oid "
+ "WHERE n.nspname = ? AND c.relname = ?";
final String[] result = new String[1];
jdbc.prepareQuery(
query,
ps -> {
ps.setString(1, tableId.schema());
ps.setString(2, tableId.table());
},
rs -> {
if (rs.next()) {
result[0] = rs.getString(1);
}
});
if (result[0] == null) {
throw new FlinkRuntimeException(
String.format(
"Failed to query replica identity for table '%s.%s': table was not found.",
tableId.schema(), tableId.table()));
}
return result[0];
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new PostgresConnectionPoolFactory();
Expand Down
Loading
Loading