[FLINK-39616][docs] Document ORDER BY usage with FROM_CHANGELOG#28166
[FLINK-39616][docs] Document ORDER BY usage with FROM_CHANGELOG#28166raminqaf wants to merge 2 commits into
Conversation
| less than or equal to the watermark are delivered to the eval() method in sorted order. Late | ||
| events (arriving after the watermark) are dropped to maintain the ordering guarantee. | ||
|
|
||
| This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the operations are applied. |
There was a problem hiding this comment.
| This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the operations are applied. | |
| This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which converts an append-only CDC stream into a changelog. Combining `PARTITION BY` on the primary key with `ORDER BY` on the event-time column reorders out-of-order CDC events per key before the conversion operations are applied. |
|
|
||
| CDC streams can deliver events out of order. For example, a key's `UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are partitioned across upstream brokers. If the source itself does not guarantee ordering, applying such a changelog directly produces incorrect state. | ||
|
|
||
| `FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref "docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each partition before they are processed. The framework buffers events per partition and flushes them to the function in sorted order once the watermark advances. Late events (arriving after the watermark) are dropped. |
There was a problem hiding this comment.
Explain what happens with a record that matches a dropped record?
For example what happens to an UPDATE_AFTER that wasn't late if its matching UPDATE_BEFORE is dropped?
There was a problem hiding this comment.
Added a section where I describe the behavior in retract/upsert mode
| op STRING, | ||
| name STRING, | ||
| event_time TIMESTAMP_LTZ(3), | ||
| WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND |
There was a problem hiding this comment.
| WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND | |
| WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE |
to keep it aligned with the example below?
There was a problem hiding this comment.
Thanks for pointing this out. Addressed
| | Incoming row | Current watermark | Outcome | | ||
| |---|---|---| | ||
| | `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted later when the watermark passes `10:05`. | | ||
| | `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. | |
There was a problem hiding this comment.
| | `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. | | |
| | `+I[id: 5, op: 'INSERT', event_time: '09:57']` | `10:00` | Dropped. Timestamp is below the current watermark. | |
Otherwise, readers might think that the 5 min difference between 9:55 and 10:00 would have any relevance.
| |---|---|---| | ||
| | `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted later when the watermark passes `10:05`. | | ||
| | `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. Timestamp is below the current watermark. | | ||
| | `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:06` | Record `id=6` is emitted; this row is buffered until the watermark passes `10:11`. | |
There was a problem hiding this comment.
to be precise, the watermark would only be incremented after the record was processed.
So "Current Watermark" shouldn't be incremented to 10:06 yet.
This happens when a new WM message is received and processed by the PTF.
There was a problem hiding this comment.
Goode one! Updated the table and made it more precise and clear. Hope it is good now!
CDC streams often deliver events out of order. The PTF ORDER BY clause introduced in FLINK-39256 aligns naturally with FROM_CHANGELOG: partitioning by the primary key and ordering by event time reorders out-of-order CDC events per key before the operations are applied. Adds a new "Ordering CDC events with ORDER BY" subsection to the FROM_CHANGELOG documentation with the SQL example and the requirements (watermarked time attribute, ASC on the first order column, set semantics via PARTITION BY). Extends the Table API examples with the equivalent partitionBy(...).orderBy(...).fromChangelog() pattern. Adds a corresponding example in the PartitionedTable#fromChangelog Javadoc. Adds a cross-reference from the PTF ORDER BY guide to FROM_CHANGELOG so users discover the pattern from either side.
|
LGTM |
What is the purpose of the change
CDC streams often deliver events out of order. The PTF ORDER BY clause introduced in FLINK-39256 aligns naturally with FROM_CHANGELOG: partitioning by the primary key and ordering by event time reorders out-of-order CDC events per key before the operations are applied.
Adds a new "Ordering CDC events with ORDER BY" subsection to the FROM_CHANGELOG documentation with the SQL example and the requirements (watermarked time attribute, ASC on the first order column, set semantics via PARTITION BY). Extends the Table API examples with the equivalent partitionBy(...).orderBy(...).fromChangelog() pattern.
Adds a corresponding example in the PartitionedTable#fromChangelog Javadoc.
Adds a cross-reference from the PTF ORDER BY guide to FROM_CHANGELOG so users discover the pattern from either side.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Opus 4.7