Skip to content

[FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG#28164

Merged
fhueske merged 6 commits into
apache:masterfrom
raminqaf:FLINK-39538
May 20, 2026
Merged

[FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG#28164
fhueske merged 6 commits into
apache:masterfrom
raminqaf:FLINK-39538

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full DELETE) when the input table is partitioned (set semantics via PARTITION BY) and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The partition key acts as the upsert key. In all other cases the output remains a retract changelog.

Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without PARTITION BY is rejected at validation time, because upsert mode requires a key.

To enable the strategy to inspect the resolved op_mapping and the input table's partition keys, ChangelogFunction.ChangelogContext is extended with two default methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults return Optional.empty() to preserve source compatibility for existing implementations.

The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the two new methods to the underlying CallContext.

Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime forwards each input delete row with all fields populated; only the RowKind is rewritten. This matches the runtime's behavior and avoids forcing downstream operators to reconstruct rows from state.

The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already returns the partition columns when a PTF emits upsert, so no metadata changes are needed.

Verifying this change

Changes are backed by semantic tests and plan verification tests

  • Added semantics tests in FromChangelogTestProgram
  • Added tests in FromChangelogTest to verify generated plan and upsertKeys

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs )

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Opus 4.7

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 15, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @raminqaf

I've left a few comments.

Cheers, Fabian

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR in general looks very good, @raminqaf!

I've added multiple comments and doc suggestions and only one change requested.

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
@raminqaf raminqaf force-pushed the FLINK-39538 branch 5 times, most recently from 7da043f to 41ce0e3 Compare May 18, 2026 15:30
Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @raminqaf!

PR is good to merge.
Left one comment with an improvement suggestion.

Fabian

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 18, 2026
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback, @raminqaf! Take a look

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
raminqaf and others added 5 commits May 19, 2026 16:52
FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full DELETE) when the input table is partitioned (set semantics via PARTITION BY) and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The partition key acts as the upsert key. In all other cases the output remains a retract changelog.

Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without PARTITION BY is rejected at validation time, because upsert mode requires a key.

To enable the strategy to inspect the resolved op_mapping and the input table's partition keys, ChangelogFunction.ChangelogContext is extended with two default methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults return Optional.empty() to preserve source compatibility for existing implementations.

The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the two new methods to the underlying CallContext.

Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime forwards each input delete row with all fields populated; only the RowKind is rewritten. This matches the runtime's behavior and avoids forcing downstream operators to reconstruct rows from state.

The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already returns the partition columns when a PTF emits upsert, so no metadata changes are needed.
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback, @raminqaf! Last two nits and we should be good to go

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the feedback. LGTM, @raminqaf

@fhueske
Copy link
Copy Markdown
Contributor

fhueske commented May 20, 2026

Merging this PR

@fhueske fhueske merged commit 4644cab into apache:master May 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants