Skip to content

Implement CHANGES table function for changelog reads#36869

Draft
antiguru wants to merge 12 commits into
mainfrom
claude/bold-bell-9lkYI
Draft

Implement CHANGES table function for changelog reads#36869
antiguru wants to merge 12 commits into
mainfrom
claude/bold-bell-9lkYI

Conversation

@antiguru
Copy link
Copy Markdown
Member

@antiguru antiguru commented Jun 2, 2026

Motivation

Closes #4527

Users want to consume the change stream of a collection as a relation they can transform with SQL. SUBSCRIBE already exposes this stream, but only as a top-level streaming statement whose output flows directly to the client — it cannot be wrapped in a SELECT, aggregated, joined, or materialized.

This PR implements the CHANGES table function, which reads a collection's changelog as an append-only relation with the per-update timestamp and diff available as ordinary mz_timestamp / mz_diff columns.

Syntax

CHANGES(<relation> AS OF [AT LEAST] <bound>)
  • <relation> is a collection named directly or a parenthesized subquery (mirroring SUBSCRIBE). A subquery must reduce to a bare read of a single persist-backed object (table, source, materialized view); anything that filters/transforms is the deferred arbitrary-expression case and is rejected. A subquery transparently reads through a non-materialized view to its underlying collection.
  • AS OF vs AS OF AT LEAST — strict (error if history unavailable) vs advisory (age in from the input's since). Reuses the existing AsOf grammar; the parens around a subquery delimit it from this trailing AS OF.
  • fixed vs sliding bound — a constant timestamp is a static changelog start (append-only); an mz_now()-relative bound is a sliding window that trails real time (the trailing edge retracts).

The bound's shape transparently signals the semantics; the design deliberately reuses AS OF (Materialize's logical-time read-hold meaning, distinct from SQL:2011), resolving the design's "parameter keyword" question.

Where it is allowed

context fixed bound sliding (mz_now()-relative) bound
one-off SELECT ✅ implemented ✅ implemented (as_of = query_time − lag; window [query_time − lag, query_time])
materialized view / index ❌ rejected (would pin since indefinitely) ⛔ not yet wired (needs lagging read policy + sliding output filter)

Implementation

  • Parser / AST (src/sql-parser): TableFactor::Changes { relation: ChangesRelation, as_of: AsOf, .. }, where ChangesRelation is Name | Query (mirrors SubscribeRelation). AS OF [AT LEAST] clause required.
  • Planner (src/sql/src/plan/): plan_changes resolves the relation (name, or subquery reduced to a bare global Get) to a single persist-backed collection, classifies the bound (strict/advisory, fixed/sliding), gates by lifetime, and builds the changelog schema (input columns + mz_timestamp + mz_diff). Feature-flagged by enable_changes_table_function.
  • IR & lowering (src/expr, src/sql/src/plan/): MirRelationExpr::Changes { id, typ, bound: MirScalarExpr } — an opaque leaf carrying the lower-bound scalar; lowers to a Get of the source import with read_as_changelog set.
  • Coordinator (src/adapter/src/optimize/peek.rs): evaluates each changelog bound at the query time (resolving mz_now()) to pin the dataflow as_of, and marks the source imports.
  • Compute (src/compute/src/render.rs): consolidates the source (snapshot collapses to as_of), then reinterprets each (row, time, diff) as the append ((row, time, diff), max(time, as_of), 1) — append-only, matching SUBSCRIBE's diff format.
  • Optimizer (src/transform/): Changes is an opaque barrier leaf in all analyses; nothing is pushed into or through it.

Verification

  • test/sqllogictest/changes.slt: feature gating, argument validation (persist-backed only; views/CTEs rejected), bound validation, name-and-subquery forms, durable-lifetime gating, sliding mz_now() plan+run.
  • test/testdrive/changes.td: data round-trip — snapshot at as_of plus later inserts/deletes appear as appends with correct mz_diff; sliding-bound and subquery forms.
  • Parser round-trip tests; pack_changelog_row unit tests.

Follows the design doc (doc/developer/design/20260602_changes_table_function.md).

Follow-ups (tracked in the design doc's open questions)

  • Maintained (materialized view) sliding-bound execution: sliding output temporal filter + lagging read policy on the input.
  • Advisory (AS OF AT LEAST) clamping: age the bound up to the input's since rather than erroring.
  • Arbitrary-expression CHANGES (generalize the subquery form beyond a bare read) — the time-invariance / optimizer-barrier work.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M

claude added 12 commits June 2, 2026 10:13
Foundation for the CHANGES(<collection>, <as_of>) table function (reads a
collection as an append-only changelog). This lands the verified, independently
testable pieces; the IR/execution wiring follows.

- Compute reinterpretation (highest-risk core): pack_changelog_row + an
  append-only changelog transform in mz_compute::render, gated by the new
  SourceImport::read_as_changelog flag. Consolidates before packing the diff
  into a column so the snapshot collapses to as_of. Unit-tested.
- SourceImport carries read_as_changelog through compute-types/compute-client;
  DataflowDescription::set_source_read_as_changelog lets the coordinator mark a
  changelog source import. import_source gains the flag (default false).
- Parser/AST: non-reserved CHANGES keyword + TableFactor::Changes, parsed in
  parse_table_factor, with display + roundtrip tests. Name resolver handles it.
- Planner: plan_changes gates on the feature flag, requires a persist-backed
  object (table/source/materialized view), and evaluates the constant as_of;
  execution wiring is bail_unsupported for now.
- Feature flag enable_changes_table_function.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Adds the dedicated, optimizer-opaque MirRelationExpr::Changes node and wires it
through the IR pipeline (the SQL surface now plans through to LIR):

- mz-expr: MirRelationExpr::Changes { id, typ, as_of } leaf; handled in typ,
  arity, keys, visitors, depends_on, and EXPLAIN. No new proto/Arbitrary.
- mz-transform: Changes treated as an opaque barrier leaf across analyses and
  transforms (registers nothing in gets maps). The typechecker accepts its
  extended (input + mz_timestamp + mz_diff) type as-is without cross-checking
  against the catalog type of id.
- mz-compute-types: LIR lowering lowers Changes to a Get of the (changelog-
  marked) source import; rendering already appends the columns and emits +1.
- mz-sql: HirRelationExpr::Changes + HIR->MIR lowering; plan_changes now builds
  the extended RelationDesc and produces the node (no longer bails).

Remaining: coordinator wiring (mark the source import read_as_changelog with the
extended type, set the dataflow as_of to the changelog as_of, take the read
hold, and peek at the latest time).

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
In the peek optimizer's finalize, scan the dataflow for MirRelationExpr::Changes
nodes, mark the corresponding source imports read_as_changelog, and pin the
dataflow as_of to the earliest changelog start (so the snapshot is taken there
and later changes replay as appends; the peek still happens at the determined
timestamp). The dataflow's implied read hold pins the input's since at the
changelog as_of for the query's lifetime.

This completes the one-off SELECT ... FROM CHANGES(coll, as_of) path.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
- test/sqllogictest/changes.slt: deterministic plan-time coverage (feature
  gating; rejection of views, CTEs, unmaterializable/negative as_of).
- test/testdrive/changes.td: data round-trip. Captures a valid as_of via
  mz_now(), then asserts the snapshot collapses to as_of and later changes
  append (insert as +1, delete as a row with mz_diff = -1), and that CHANGES
  composes with GROUP BY/aggregation. Asserts data + mz_diff only, since the
  per-update timestamp is non-deterministic.

https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Replace the comma-separated `CHANGES(name, expr)` argument with an
`AS OF [AT LEAST] <bound>` clause that mirrors SUBSCRIBE's surface. The
bound now varies along two orthogonal axes:

  * strict (`AS OF`) vs. advisory (`AS OF AT LEAST`), carried by AsOf
  * fixed (constant) vs. sliding (`mz_now()`-relative), detected via
    contains_temporal()

Gate by lifetime: a durable maintained object (materialized view,
index) requires a sliding bound, since a fixed lower bound would hold
the input's `since` open indefinitely. A sliding bound parses and
passes gating but its dataflow wiring (output temporal filter + lagging
read policy) is not yet implemented, so it currently bail_unsupported.
The fixed-bound one-off SELECT path is unchanged.

Update the design doc with the two-axis model, the pruned
lifetime x bound matrix, and the resolved parameter-keyword question.
Carry the changelog lower bound as an mz_timestamp-typed MirScalarExpr on the
Changes node instead of a pre-folded Timestamp, and evaluate it in the peek
coordinator at the query time (resolving mz_now()). A fixed bound folds to a
constant; a sliding mz_now()-relative bound yields as_of = query_time - lag, so
a one-off SELECT reads the window [query_time - lag, query_time] with no output
temporal filter needed (the peek time is the upper edge).

Gating: only one-off SELECTs are wired for execution; durable maintained
objects with a fixed bound are rejected (indefinite hold), and other non-SELECT
uses bail as unsupported. The maintained sliding case (lagging read policy)
remains a follow-up.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The CHANGES relation argument can now be either a collection named directly or
a parenthesized subquery, mirroring SUBSCRIBE's surface. The AST gains a
ChangesRelation { Name, Query } enum; the planner resolves either form to the
single persist-backed collection it reads. A subquery must reduce to a bare
read (identity projection / empty map over a global Get) of a table, source, or
materialized view; anything that filters or transforms is the deferred
arbitrary-expression case and is rejected as unsupported. Unlike the name form,
a subquery transparently reads through a non-materialized view (which inlines)
to its underlying persist collection.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Thread the strict/advisory distinction (AS OF vs AS OF AT LEAST) onto the
Changes IR node, and the inputs' read frontier (determination.since) into the
peek optimizer. When resolving the changelog as_of at the query time, an
advisory bound is clamped up to `since` (aging in to the earliest available
history rather than erroring), while a strict bound is pinned as written (if it
precedes `since`, dataflow creation errors, surfacing the unavailable history).

The peek's existing read holds already pin `since` for the query duration, so
no new hold is needed; the dataflow as_of must clear `since` anyway, making the
clamp both correct and required. Non-peek callers (frontend_peek SELECT path)
thread the same frontier; explain-only paths (insights) pass a minimal since.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The Changes variant was missing an arm in MreDiff (the structured-diff backing
MirRelationExpr::eq), so two identical Changes nodes compared unequal via eq
while cmp reported them equal, tripping the eq/cmp consistency soft-assert. In
CI's debug build this panicked the optimizer (its fixpoint compares an
expression to its prior form), so every CHANGES execution failed. Add the
Changes arm and a regression test asserting eq agrees with cmp.

Also:
- Exclude changes.slt from --auto-index-selects: CHANGES is rejected in an
  index/materialized-view context, so wrapping its SELECTs in an indexed view
  is intentionally inconsistent.
- Shorten the durable-lifetime gating error to a one-line message and move the
  mz_now()-relative suggestion into a hint (new PlanError::ChangesRequiresSlidingBound).

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
- Design doc: add "Maintained materialized views: sliding execution" describing
  the intended (not-yet-implemented) MV path — gating, a lagging read policy on
  the input, a continuous output temporal filter, and bounded-restart
  semantics — and why it is deferred until it can be runtime-tested as a unit.
- testdrive: add a CHANGES-over-materialized-view round-trip, covering a
  non-table persist-backed input and changes flowing through the MV.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…s leaf

The NonNegative analysis used a `_ => results[index - 1]` catch-all for
single-input operators, with explicit arms only for the `Constant` and `Get`
leaves. The `Changes` leaf was never given an arm, so it fell into the
catch-all: for a `Changes` node at post-order index 0 (e.g. `SELECT a, mz_diff
FROM CHANGES(t)`, which optimizes to `Project(Changes)`), that computes
`results[0 - 1]` = `results[usize::MAX]` and panics the optimizer with an
out-of-bounds index ("the len is 2 but the index is 18446744073709551615").
A `count(*)` plan hit the same underflow with a larger node count.

Add an explicit `Changes => false` leaf arm: a changelog read is conservatively
treated as possibly-retracting (the maintained sliding case ages rows out of
the window), and, like Constant/Get, never reaches the indexing catch-all.

Other Changes arms (eq/cmp, monotonic, equivalences, arity, types, keys, column
names, cardinality) were added when the variant was introduced because those
matches are exhaustive; only NonNegative's catch-all let the leaf slip through.

Add a regression test running the analysis on `Project(Changes)`.

https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants