Skip to content

[FLINK-39409][connect/postgres] Support tables without primary key for incremental snapshot#4367

Open
JNSimba wants to merge 7 commits intoapache:masterfrom
JNSimba:feature/postgres-no-pk-table-support
Open

[FLINK-39409][connect/postgres] Support tables without primary key for incremental snapshot#4367
JNSimba wants to merge 7 commits intoapache:masterfrom
JNSimba:feature/postgres-no-pk-table-support

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 8, 2026

Summary

This PR adds support for PostgreSQL tables without primary keys in incremental snapshot mode,
following the same approach as MySQL (PR #2150).

Changes

JDBC Base Layer (flink-cdc-base):

  • SplitKeyUtils.getSplitKey(): Handle null record key by extracting chunk key from value's
    after/before struct instead of record key
  • IncrementalSourceScanFetcher.pollWithBuffer(): Use after struct as buffer key when
    record key is null (no-PK table)
  • JdbcSourceFetchTaskContext.rewriteOutputBuffer(): Support no-PK merge logic — for tables
    without primary key, use before/after struct as buffer key for CREATE/UPDATE/DELETE operations

PostgreSQL Connector (flink-connector-postgres-cdc):

  • PostgresDialect: Validate that tables without primary key must have REPLICA IDENTITY FULL
    set, querying pg_class.relreplident and failing fast with a clear error message if not

Tests:

  • Added products_no_pk table DDL (without PK, with REPLICA IDENTITY FULL)
  • testNoPKTableWithChunkKey: End-to-end test for no-PK table with scan.incremental.snapshot.chunk.key-column configured
  • testNoPKTableWithoutChunkKey: Verify ValidationException is thrown when chunk key column is not specified

Design Decisions

  • No interface change: Kept Map<Struct, SourceRecord> in FetchTask.Context.rewriteOutputBuffer()
    unchanged to avoid breaking MongoDB and other connectors. For no-PK tables, the full row struct
    (all columns via REPLICA IDENTITY FULL) serves as the buffer key.
  • At-least-once for chunk key updates: When an UPDATE changes the chunk key column value and
    crosses split boundaries, only at-least-once semantics can be guaranteed (consistent with MySQL behavior).
  • REPLICA IDENTITY FULL required: PostgreSQL requires this setting on no-PK tables to provide
    full before image in WAL for UPDATE/DELETE events. The connector validates this at startup.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds support for PostgreSQL tables without primary keys when running in incremental snapshot mode, aligning behavior with the existing MySQL approach.

Changes:

  • Extend incremental snapshot split-key extraction and output buffering to handle Debezium records with null keys (no-PK tables).
  • Add PostgreSQL-side validation to require REPLICA IDENTITY FULL for no-PK tables.
  • Add end-to-end Postgres IT coverage and new DDL for a no-PK test table.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_no_pk.sql Adds a no-PK test table and seed data (with REPLICA IDENTITY FULL).
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java Adds IT cases for no-PK tables with/without chunk key.
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java Validates REPLICA IDENTITY FULL for no-PK tables during schema discovery.
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SplitKeyUtils.java Extracts chunk key from value (before/after) when record key is null.
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java Supports buffer rewrite/merge logic for no-PK by using before/after structs as buffer keys.
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java Uses value after struct as initial snapshot buffer key when record key is null.
Comments suppressed due to low confidence (1)

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java:142

  • Typo/grammar in exception message: "the the record" should be corrected to "the record" (and/or rephrase for clarity).
                    throw new IllegalStateException(
                            String.format(
                                    "Data change record shouldn't use READ operation, the the record is %s.",
                                    changeRecord));

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants