Skip to content

[FLINK-39616][docs] Document ORDER BY usage with FROM_CHANGELOG#28166

Open
raminqaf wants to merge 2 commits into
apache:masterfrom
raminqaf:FLINK-39616
Open

[FLINK-39616][docs] Document ORDER BY usage with FROM_CHANGELOG#28166
raminqaf wants to merge 2 commits into
apache:masterfrom
raminqaf:FLINK-39616

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

CDC streams often deliver events out of order. The PTF ORDER BY clause introduced in FLINK-39256 aligns naturally with FROM_CHANGELOG: partitioning by the primary key and ordering by event time reorders out-of-order CDC events per key before the operations are applied.

Adds a new "Ordering CDC events with ORDER BY" subsection to the FROM_CHANGELOG documentation with the SQL example and the requirements (watermarked time attribute, ASC on the first order column, set semantics via PARTITION BY). Extends the Table API examples with the equivalent partitionBy(...).orderBy(...).fromChangelog() pattern.

Adds a corresponding example in the PartitionedTable#fromChangelog Javadoc.

Adds a cross-reference from the PTF ORDER BY guide to FROM_CHANGELOG so users discover the pattern from either side.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (docs / JavaDocs)

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Opus 4.7

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 15, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @raminqaf!

Left a few comments with improvement suggestions.

Cheers, Fabian

less than or equal to the watermark are delivered to the eval() method in sorted order. Late
events (arriving after the watermark) are dropped to maintain the ordering guarantee.

This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the operations are applied.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the operations are applied.
This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the conversion operations are applied.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed


CDC streams can deliver events out of order. For example, a key's `UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are partitioned across upstream brokers. If the source itself does not guarantee ordering, applying such a changelog directly produces incorrect state.

`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref "docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each partition before they are processed. The framework buffers events per partition and flushes them to the function in sorted order once the watermark advances. Late events (arriving after the watermark) are dropped.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain what happens with a record that matches a dropped record?
For example what happens to an UPDATE_AFTER that wasn't late if its matching UPDATE_BEFORE is dropped?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a section where I describe the behavior in retract/upsert mode

op STRING,
name STRING,
event_time TIMESTAMP_LTZ(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE

to keep it aligned with the example below?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Addressed

| Incoming row | Current watermark | Outcome |
|---|---|---|
| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted later when the watermark passes `10:05`. |
| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. |
| `+I[id: 5, op: 'INSERT', event_time: '09:57']` | `10:00` | Dropped. Timestamp is below the current watermark. |

Otherwise, readers might think that the 5 min difference between 9:55 and 10:00 would have any relevance.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

|---|---|---|
| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted later when the watermark passes `10:05`. |
| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. |
| `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:06` | Record `id=6` is emitted; this row is buffered until the watermark passes `10:11`. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be precise, the watermark would only be incremented after the record was processed.
So "Current Watermark" shouldn't be incremented to 10:06 yet.
This happens when a new WM message is received and processed by the PTF.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goode one! Updated the table and made it more precise and clear. Hope it is good now!

raminqaf and others added 2 commits May 16, 2026 09:58
CDC streams often deliver events out of order. The PTF ORDER BY clause introduced in FLINK-39256 aligns naturally with FROM_CHANGELOG: partitioning by the primary key and ordering by event time reorders out-of-order CDC events per key before the operations are applied.

Adds a new "Ordering CDC events with ORDER BY" subsection to the FROM_CHANGELOG documentation with the SQL example and the requirements (watermarked time attribute, ASC on the first order column, set semantics via PARTITION BY). Extends the Table API examples with the equivalent partitionBy(...).orderBy(...).fromChangelog() pattern.

Adds a corresponding example in the PartitionedTable#fromChangelog Javadoc.

Adds a cross-reference from the PTF ORDER BY guide to FROM_CHANGELOG so users discover the pattern from either side.
@featzhang
Copy link
Copy Markdown
Member

LGTM

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants