Skip to content

[FLINK-39582][postgres] Allow logical messages#4387

Open
eskabetxe wants to merge 4 commits into
apache:masterfrom
eskabetxe:FLINK-39582
Open

[FLINK-39582][postgres] Allow logical messages#4387
eskabetxe wants to merge 4 commits into
apache:masterfrom
eskabetxe:FLINK-39582

Conversation

@eskabetxe
Copy link
Copy Markdown
Member

@eskabetxe eskabetxe commented Apr 29, 2026

What is the purpose of the change

Enable Postgres logical decoding messages produced by pg_logical_emit_message(transactional, prefix, content) to flow through to the user's DebeziumDeserializationSchema. Today these records are silently dropped before reaching deserialization. See [FLINK-39582] for the problem statement and root cause.

Brief change log

Base extension hooks (no domain logic; pure extensibility):

  • DataSourceDialect: new default createStreamFetcher(FetchTask.Context, int) factory returning the standard IncrementalSourceStreamFetcher. Existing dialects inherit unchanged behavior.
  • IncrementalSourceSplitReader.getStreamFetcher(): now obtains the fetcher via dataSourceDialect.createStreamFetcher(...) instead of direct instantiation.
  • IncrementalSourceStreamFetcher.shouldEmit(): visibility private → protected so dialect subclasses can override.
  • IncrementalSourceRecordEmitter.updateStreamSplitState(): visibility private → protected for the same reason.

Postgres connector:

  • New PostgresSourceStreamFetcher extends the base; overrides shouldEmit() to early-return true for op="m" records when the feature flag is enabled, otherwise delegates to super. Also exposes a package-private isLogicalMessage(SourceRecord) helper.
  • PostgresDialect.createStreamFetcher() returns PostgresSourceStreamFetcher with the flag read from PostgresSourceConfig.
  • PostgresSourceRecordEmitter.processElement() adds a branch for logical messages: updates the stream split state and emits to deserialization. Falls through to super otherwise.

Configuration:

  • New option scan.logical-message.enabled (boolean, default false) on PostgresSourceOptions. Default preserves existing behavior.
  • Plumbed through PostgresSourceConfig (field + getter), PostgresSourceConfigFactory (setter), and PostgresSourceBuilder.includeLogicalMessages(boolean) (DataStream API).

Design notes

  • Why a dialect factory hook instead of an isLogicalMessage() guard in the base classes? op="m" is only produced by the Postgres connector in this repo. Putting the check in shared code would leak Postgres-specific semantics into flink-cdc-base. The factory hook is a generic extensibility point; the actual logic stays in the Postgres module.
  • Why not override isDataChangeRecord() in the Postgres FetchTask.Context? It's semantically wrong — a logical message is a data change. It would also affect SourceReaderMetrics and other call sites, hiding logical messages from telemetry.
  • Content is emitted as-is. pg_logical_emit_message accepts bytea — arbitrary binary content. The content field is delivered as raw byte[] (Kafka Connect Schema.BYTES). The base64-looking output you'll see in JSON-converted records is the standard JsonConverter encoding for BYTES; Avro/Protobuf converters carry raw bytes. Interpretation belongs in the user's deserializer (they know what their prefix means), not the connector.
  • Default is false. Existing pipelines see no behavior change. Opt-in feature.

Verifying this change

Added tests under flink-connector-postgres-cdc/src/test/java/.../source/reader/:

  • PostgresSourceStreamFetcherTest — unit tests for isLogicalMessage() covering: op="m" (true); op="c"/"u"/"d"/"r"/"t" (false); null value; Struct without op field; Struct with null op value; non-Struct value.
  • PostgresSourceStreamFetcherITCase — testcontainer-based integration test (PG 14, pgoutput, Debezium 1.9.8):
  1. logicalMessagesAreEmittedWhenEnabled: with flag on, calls pg_logical_emit_message(false, 'cdc-test', 'hello') and polls until an op="m" record appears.
  2. logicalMessagesAreDroppedWhenDisabled: with flag off, calls the same followed by a marker INSERT. Polls until the marker arrives (proving the WAL has advanced past the message), then asserts no op="m" record was seen.

@eskabetxe
Copy link
Copy Markdown
Member Author

@GOODBOY008 can you help review this?

@leonardBang leonardBang requested a review from loserwang1024 May 6, 2026 02:09
@loserwang1024
Copy link
Copy Markdown
Contributor

@eskabetxe , pg_logical_emit_message is more tightly coupled with the user's business logic rather than being a CDC log generated by PostgreSQL based on DML operations. Personally, I am skeptical about whether it should be introduced at all. @leonardBang , WDYT?

If it is introduced, what happens if the messages emitted via pg_logical_emit_message do not align with the table schema?If you insist on introducing it, please add a configuration option to explicitly enable it. Do not alter the existing logic to avoid disrupting currently running jobs that are working as expected.

@eskabetxe
Copy link
Copy Markdown
Member Author

eskabetxe commented May 6, 2026

Hi @loserwang1024
its opt-in with the config scan.logical-message.enabled (boolean, default false)

@leonardBang
Copy link
Copy Markdown
Contributor

@eskabetxe , pg_logical_emit_message is more tightly coupled with the user's business logic rather than being a CDC log generated by PostgreSQL based on DML operations. Personally, I am skeptical about whether it should be introduced at all. @leonardBang , WDYT?

If it is introduced, what happens if the messages emitted via pg_logical_emit_message do not align with the table schema?If you insist on introducing it, please add a configuration option to explicitly enable it. Do not alter the existing logic to avoid disrupting currently running jobs that are working as expected.

+1 to introduce a config and only expose it in DataStream API. We should always disabled this feature for SQL&YAML API as it will crush the entire pipeline once we delivered this kind of event, WDYT?

@eskabetxe
Copy link
Copy Markdown
Member Author

@leonardBang this is already configured with a config.. must activated it with "scan.logical-message.enabled" (default false)

I agree that this is more for the use case of using the connector directly, but is a valid configuration for that.

For the YAML app it will be disable and should not have any option to activate..

@eskabetxe
Copy link
Copy Markdown
Member Author

@loserwang1024 @leonardBang could you check

Copy link
Copy Markdown
Contributor

@loserwang1024 loserwang1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left some message.

* pg_logical_emit_message} records (op="m") bypass table-based watermark filtering, since logical
* messages are not bound to a table.
*/
public class PostgresSourceStreamFetcher extends IncrementalSourceStreamFetcher {
Copy link
Copy Markdown
Contributor

@loserwang1024 loserwang1024 May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a good choice to a new subclass PostgresSourceStreamFetcher here. Maybe we can move shouldEmit to FetchTask.Context(such as isDataChangeRecord).

When we design context and dialect control the different behaviors for different databases. The IncrementalSourceStreamFetcher shares same logical.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was doing that as logical messages are something specific to postgres.. but lets see if the change I make is more aligned with what you are asking

Copy link
Copy Markdown
Contributor

@leonardBang leonardBang May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eskabetxe @loserwang1024 I'd suggest following the FetchTask.Context approach rather than introducing a PostgresSourceStreamFetcher subclass. Two reasons:

  1. Better cohesion. The Context abstraction already serves as the dialect-specific extension point — isDataChangeRecord() lives there for exactly this reason. Adding shouldEmit() to Context keeps all Postgres-specific decision logic in one place (PostgresSourceFetchTaskContext), rather than splitting it between a new StreamFetcher subclass and the existing context.
  2. Avoids a double-check path. The current implementation introduces shouldEmit(record) && context.shouldEmit(record), which creates ambiguity about which layer owns the emit decision. If we consolidate everything into Context.shouldEmit(), the base IncrementalSourceStreamFetcher would simply call context.shouldEmit(record) with no subclass override needed:
 // IncrementalSourceStreamFetcher — no subclass needed
 if (context.shouldEmit(record)) {
     // emit
 }

 // PostgresSourceFetchTaskContext — single decision point
 @Override
 public boolean shouldEmit(SourceRecord record) {
     if (isLogicalMessage(record)) {
         return matchesPrefix(record, prefixes);
     }
     return super.shouldEmit(record);
 }


static boolean isLogicalMessage(SourceRecord record) {
if (record.value() instanceof Struct) {
Struct struct = (Struct) record.value();
Copy link
Copy Markdown
Contributor

@loserwang1024 loserwang1024 May 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor confusion regarding PostgreSQL: The WAL logs in PostgreSQL are substituted based on table names. However, the logical messages being read now seem to lack identity identifiers, making it impossible to distinguish between subscriptions (i.e., different jobs would all read the same data).

I have searched how to use logical message, it seems that prefix is used for it.

pg_logical_emit_message ( transactional boolean , prefix text , content text [, flush boolean DEFAULTfalse ] ) 

Therefore, I do think we need to add a param to control which prefix is needed to filter the target logical message.

the design of this config is the key of this issue, @leonardBang , WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a prefix filtering in same way we have tablesInclued

logical-message.prefixes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leonardBang @ruanhang1993 help to see the config logical-message.prefixes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the configuration design: if providing logical message prefixes is mandatory for PG logical decoding to work correctly (i.e., every job that writes pg_logical_emit_message must specify a prefix to differentiate subscriptions), I'd suggest we can remove scan.logical-message.enabled entirely and use the presence of scan.logical-message.prefixes as the implicit enablement signal.

When scan.logical-message.prefixes is set (non-empty), the feature is implicitly enabled — the user clearly intends to consume logical messages
When scan.logical-message.prefixes is empty or absent, no logical messages are consumed — default safe behavior preserved
This eliminates the risk of misconfiguration where enabled=true but prefixes is accidentally left empty (which would consume ALL logical messages across subscriptions — a potentially dangerous default)
It removes one config knob without losing expressiveness.

 // PostgresSourceFetchTaskContext.shouldEmit()
 if (isLogicalMessage(record)) {
     // Feature is implicitly enabled when prefixes are configured
     return logicalMessagePrefixes.isEmpty() ? false
         : logicalMessagePrefixes.stream().anyMatch(prefix -> matchesPrefix(record, prefix));
 }

The PostgresSourceBuilder API could expose a single method:

builder.includeLogicalMessages(List<String> prefixes)

where passing a non-empty list enables the feature, and omitting the call or passing an empty list keeps it disabled.

WDYT? @loserwang1024 @eskabetxe

@eskabetxe eskabetxe force-pushed the FLINK-39582 branch 3 times, most recently from 7eb87d5 to 07740ce Compare May 26, 2026 21:44
@loserwang1024
Copy link
Copy Markdown
Contributor

@leonardBang @eskabetxe I have modified some code and test, please help review.

@eskabetxe
Copy link
Copy Markdown
Member Author

@leonardBang @eskabetxe I have modified some code and test, please help review.

@loserwang1024 for @leonardBang I think we are missing some changes. I added it here
so it won't affect the changes you already made, if you think that they are ok I can merge the code in this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants