[FLINK-39582][postgres] Allow logical messages#4387
Conversation
|
@GOODBOY008 can you help review this? |
|
@eskabetxe , If it is introduced, what happens if the messages emitted via pg_logical_emit_message do not align with the table schema?If you insist on introducing it, please add a configuration option to explicitly enable it. Do not alter the existing logic to avoid disrupting currently running jobs that are working as expected. |
|
Hi @loserwang1024 |
+1 to introduce a config and only expose it in DataStream API. We should always disabled this feature for SQL&YAML API as it will crush the entire pipeline once we delivered this kind of event, WDYT? |
|
@leonardBang this is already configured with a config.. must activated it with "scan.logical-message.enabled" (default false) I agree that this is more for the use case of using the connector directly, but is a valid configuration for that. For the YAML app it will be disable and should not have any option to activate.. |
|
@loserwang1024 @leonardBang could you check |
loserwang1024
left a comment
There was a problem hiding this comment.
I have left some message.
| * pg_logical_emit_message} records (op="m") bypass table-based watermark filtering, since logical | ||
| * messages are not bound to a table. | ||
| */ | ||
| public class PostgresSourceStreamFetcher extends IncrementalSourceStreamFetcher { |
There was a problem hiding this comment.
I don't think this is a good choice to a new subclass PostgresSourceStreamFetcher here. Maybe we can move shouldEmit to FetchTask.Context(such as isDataChangeRecord).
When we design context and dialect control the different behaviors for different databases. The IncrementalSourceStreamFetcher shares same logical.
There was a problem hiding this comment.
I was doing that as logical messages are something specific to postgres.. but lets see if the change I make is more aligned with what you are asking
There was a problem hiding this comment.
@eskabetxe @loserwang1024 I'd suggest following the FetchTask.Context approach rather than introducing a PostgresSourceStreamFetcher subclass. Two reasons:
- Better cohesion. The Context abstraction already serves as the dialect-specific extension point —
isDataChangeRecord()lives there for exactly this reason. AddingshouldEmit()to Context keeps all Postgres-specific decision logic in one place (PostgresSourceFetchTaskContext), rather than splitting it between a new StreamFetcher subclass and the existing context. - Avoids a double-check path. The current implementation introduces
shouldEmit(record) && context.shouldEmit(record), which creates ambiguity about which layer owns the emit decision. If we consolidate everything intoContext.shouldEmit(), the baseIncrementalSourceStreamFetcherwould simply callcontext.shouldEmit(record)with no subclass override needed:
// IncrementalSourceStreamFetcher — no subclass needed
if (context.shouldEmit(record)) {
// emit
}
// PostgresSourceFetchTaskContext — single decision point
@Override
public boolean shouldEmit(SourceRecord record) {
if (isLogicalMessage(record)) {
return matchesPrefix(record, prefixes);
}
return super.shouldEmit(record);
}|
|
||
| static boolean isLogicalMessage(SourceRecord record) { | ||
| if (record.value() instanceof Struct) { | ||
| Struct struct = (Struct) record.value(); |
There was a problem hiding this comment.
A minor confusion regarding PostgreSQL: The WAL logs in PostgreSQL are substituted based on table names. However, the logical messages being read now seem to lack identity identifiers, making it impossible to distinguish between subscriptions (i.e., different jobs would all read the same data).
I have searched how to use logical message, it seems that prefix is used for it.
pg_logical_emit_message ( transactional boolean , prefix text , content text [, flush boolean DEFAULTfalse ] ) Therefore, I do think we need to add a param to control which prefix is needed to filter the target logical message.
the design of this config is the key of this issue, @leonardBang , WDYT?
There was a problem hiding this comment.
added a prefix filtering in same way we have tablesInclued
logical-message.prefixes
There was a problem hiding this comment.
@leonardBang @ruanhang1993 help to see the config logical-message.prefixes
There was a problem hiding this comment.
Regarding the configuration design: if providing logical message prefixes is mandatory for PG logical decoding to work correctly (i.e., every job that writes pg_logical_emit_message must specify a prefix to differentiate subscriptions), I'd suggest we can remove scan.logical-message.enabled entirely and use the presence of scan.logical-message.prefixes as the implicit enablement signal.
When scan.logical-message.prefixes is set (non-empty), the feature is implicitly enabled — the user clearly intends to consume logical messages
When scan.logical-message.prefixes is empty or absent, no logical messages are consumed — default safe behavior preserved
This eliminates the risk of misconfiguration where enabled=true but prefixes is accidentally left empty (which would consume ALL logical messages across subscriptions — a potentially dangerous default)
It removes one config knob without losing expressiveness.
// PostgresSourceFetchTaskContext.shouldEmit()
if (isLogicalMessage(record)) {
// Feature is implicitly enabled when prefixes are configured
return logicalMessagePrefixes.isEmpty() ? false
: logicalMessagePrefixes.stream().anyMatch(prefix -> matchesPrefix(record, prefix));
}The PostgresSourceBuilder API could expose a single method:
builder.includeLogicalMessages(List<String> prefixes)where passing a non-empty list enables the feature, and omitting the call or passing an empty list keeps it disabled.
WDYT? @loserwang1024 @eskabetxe
7eb87d5 to
07740ce
Compare
ca58e83 to
5fdea62
Compare
|
@leonardBang @eskabetxe I have modified some code and test, please help review. |
@loserwang1024 for @leonardBang I think we are missing some changes. I added it here |
7411cb3 to
02193a7
Compare
02193a7 to
946ff9f
Compare
What is the purpose of the change
Enable Postgres logical decoding messages produced by pg_logical_emit_message(transactional, prefix, content) to flow through to the user's DebeziumDeserializationSchema. Today these records are silently dropped before reaching deserialization. See [FLINK-39582] for the problem statement and root cause.
Brief change log
Base extension hooks (no domain logic; pure extensibility):
Postgres connector:
Configuration:
Design notes
Verifying this change
Added tests under flink-connector-postgres-cdc/src/test/java/.../source/reader/: