Implement CHANGES table function for changelog reads#36869
Draft
antiguru wants to merge 12 commits into
Draft
Conversation
Foundation for the CHANGES(<collection>, <as_of>) table function (reads a collection as an append-only changelog). This lands the verified, independently testable pieces; the IR/execution wiring follows. - Compute reinterpretation (highest-risk core): pack_changelog_row + an append-only changelog transform in mz_compute::render, gated by the new SourceImport::read_as_changelog flag. Consolidates before packing the diff into a column so the snapshot collapses to as_of. Unit-tested. - SourceImport carries read_as_changelog through compute-types/compute-client; DataflowDescription::set_source_read_as_changelog lets the coordinator mark a changelog source import. import_source gains the flag (default false). - Parser/AST: non-reserved CHANGES keyword + TableFactor::Changes, parsed in parse_table_factor, with display + roundtrip tests. Name resolver handles it. - Planner: plan_changes gates on the feature flag, requires a persist-backed object (table/source/materialized view), and evaluates the constant as_of; execution wiring is bail_unsupported for now. - Feature flag enable_changes_table_function. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Adds the dedicated, optimizer-opaque MirRelationExpr::Changes node and wires it
through the IR pipeline (the SQL surface now plans through to LIR):
- mz-expr: MirRelationExpr::Changes { id, typ, as_of } leaf; handled in typ,
arity, keys, visitors, depends_on, and EXPLAIN. No new proto/Arbitrary.
- mz-transform: Changes treated as an opaque barrier leaf across analyses and
transforms (registers nothing in gets maps). The typechecker accepts its
extended (input + mz_timestamp + mz_diff) type as-is without cross-checking
against the catalog type of id.
- mz-compute-types: LIR lowering lowers Changes to a Get of the (changelog-
marked) source import; rendering already appends the columns and emits +1.
- mz-sql: HirRelationExpr::Changes + HIR->MIR lowering; plan_changes now builds
the extended RelationDesc and produces the node (no longer bails).
Remaining: coordinator wiring (mark the source import read_as_changelog with the
extended type, set the dataflow as_of to the changelog as_of, take the read
hold, and peek at the latest time).
https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
In the peek optimizer's finalize, scan the dataflow for MirRelationExpr::Changes nodes, mark the corresponding source imports read_as_changelog, and pin the dataflow as_of to the earliest changelog start (so the snapshot is taken there and later changes replay as appends; the peek still happens at the determined timestamp). The dataflow's implied read hold pins the input's since at the changelog as_of for the query's lifetime. This completes the one-off SELECT ... FROM CHANGES(coll, as_of) path. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
- test/sqllogictest/changes.slt: deterministic plan-time coverage (feature gating; rejection of views, CTEs, unmaterializable/negative as_of). - test/testdrive/changes.td: data round-trip. Captures a valid as_of via mz_now(), then asserts the snapshot collapses to as_of and later changes append (insert as +1, delete as a row with mz_diff = -1), and that CHANGES composes with GROUP BY/aggregation. Asserts data + mz_diff only, since the per-update timestamp is non-deterministic. https://claude.ai/code/session_01HjVe9kzuaFvEP9jXou2iZ5
Replace the comma-separated `CHANGES(name, expr)` argument with an
`AS OF [AT LEAST] <bound>` clause that mirrors SUBSCRIBE's surface. The
bound now varies along two orthogonal axes:
* strict (`AS OF`) vs. advisory (`AS OF AT LEAST`), carried by AsOf
* fixed (constant) vs. sliding (`mz_now()`-relative), detected via
contains_temporal()
Gate by lifetime: a durable maintained object (materialized view,
index) requires a sliding bound, since a fixed lower bound would hold
the input's `since` open indefinitely. A sliding bound parses and
passes gating but its dataflow wiring (output temporal filter + lagging
read policy) is not yet implemented, so it currently bail_unsupported.
The fixed-bound one-off SELECT path is unchanged.
Update the design doc with the two-axis model, the pruned
lifetime x bound matrix, and the resolved parameter-keyword question.
Carry the changelog lower bound as an mz_timestamp-typed MirScalarExpr on the Changes node instead of a pre-folded Timestamp, and evaluate it in the peek coordinator at the query time (resolving mz_now()). A fixed bound folds to a constant; a sliding mz_now()-relative bound yields as_of = query_time - lag, so a one-off SELECT reads the window [query_time - lag, query_time] with no output temporal filter needed (the peek time is the upper edge). Gating: only one-off SELECTs are wired for execution; durable maintained objects with a fixed bound are rejected (indefinite hold), and other non-SELECT uses bail as unsupported. The maintained sliding case (lagging read policy) remains a follow-up. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The CHANGES relation argument can now be either a collection named directly or
a parenthesized subquery, mirroring SUBSCRIBE's surface. The AST gains a
ChangesRelation { Name, Query } enum; the planner resolves either form to the
single persist-backed collection it reads. A subquery must reduce to a bare
read (identity projection / empty map over a global Get) of a table, source, or
materialized view; anything that filters or transforms is the deferred
arbitrary-expression case and is rejected as unsupported. Unlike the name form,
a subquery transparently reads through a non-materialized view (which inlines)
to its underlying persist collection.
https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
Thread the strict/advisory distinction (AS OF vs AS OF AT LEAST) onto the Changes IR node, and the inputs' read frontier (determination.since) into the peek optimizer. When resolving the changelog as_of at the query time, an advisory bound is clamped up to `since` (aging in to the earliest available history rather than erroring), while a strict bound is pinned as written (if it precedes `since`, dataflow creation errors, surfacing the unavailable history). The peek's existing read holds already pin `since` for the query duration, so no new hold is needed; the dataflow as_of must clear `since` anyway, making the clamp both correct and required. Non-peek callers (frontend_peek SELECT path) thread the same frontier; explain-only paths (insights) pass a minimal since. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
The Changes variant was missing an arm in MreDiff (the structured-diff backing MirRelationExpr::eq), so two identical Changes nodes compared unequal via eq while cmp reported them equal, tripping the eq/cmp consistency soft-assert. In CI's debug build this panicked the optimizer (its fixpoint compares an expression to its prior form), so every CHANGES execution failed. Add the Changes arm and a regression test asserting eq agrees with cmp. Also: - Exclude changes.slt from --auto-index-selects: CHANGES is rejected in an index/materialized-view context, so wrapping its SELECTs in an indexed view is intentionally inconsistent. - Shorten the durable-lifetime gating error to a one-line message and move the mz_now()-relative suggestion into a hint (new PlanError::ChangesRequiresSlidingBound). https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
- Design doc: add "Maintained materialized views: sliding execution" describing the intended (not-yet-implemented) MV path — gating, a lagging read policy on the input, a continuous output temporal filter, and bounded-restart semantics — and why it is deferred until it can be runtime-tested as a unit. - testdrive: add a CHANGES-over-materialized-view round-trip, covering a non-table persist-backed input and changes flowing through the MV. https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
…s leaf
The NonNegative analysis used a `_ => results[index - 1]` catch-all for
single-input operators, with explicit arms only for the `Constant` and `Get`
leaves. The `Changes` leaf was never given an arm, so it fell into the
catch-all: for a `Changes` node at post-order index 0 (e.g. `SELECT a, mz_diff
FROM CHANGES(t)`, which optimizes to `Project(Changes)`), that computes
`results[0 - 1]` = `results[usize::MAX]` and panics the optimizer with an
out-of-bounds index ("the len is 2 but the index is 18446744073709551615").
A `count(*)` plan hit the same underflow with a larger node count.
Add an explicit `Changes => false` leaf arm: a changelog read is conservatively
treated as possibly-retracting (the maintained sliding case ages rows out of
the window), and, like Constant/Get, never reaches the indexing catch-all.
Other Changes arms (eq/cmp, monotonic, equivalences, arity, types, keys, column
names, cardinality) were added when the variant was introduced because those
matches are exhaustive; only NonNegative's catch-all let the leaf slip through.
Add a regression test running the analysis on `Project(Changes)`.
https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Closes #4527
Users want to consume the change stream of a collection as a relation they can transform with SQL.
SUBSCRIBEalready exposes this stream, but only as a top-level streaming statement whose output flows directly to the client — it cannot be wrapped in aSELECT, aggregated, joined, or materialized.This PR implements the
CHANGEStable function, which reads a collection's changelog as an append-only relation with the per-update timestamp and diff available as ordinarymz_timestamp/mz_diffcolumns.Syntax
<relation>is a collection named directly or a parenthesized subquery (mirroringSUBSCRIBE). A subquery must reduce to a bare read of a single persist-backed object (table, source, materialized view); anything that filters/transforms is the deferred arbitrary-expression case and is rejected. A subquery transparently reads through a non-materialized view to its underlying collection.AS OFvsAS OF AT LEAST— strict (error if history unavailable) vs advisory (age in from the input'ssince). Reuses the existingAsOfgrammar; the parens around a subquery delimit it from this trailingAS OF.mz_now()-relative bound is a sliding window that trails real time (the trailing edge retracts).The bound's shape transparently signals the semantics; the design deliberately reuses
AS OF(Materialize's logical-time read-hold meaning, distinct from SQL:2011), resolving the design's "parameter keyword" question.Where it is allowed
mz_now()-relative) boundSELECTas_of = query_time − lag; window[query_time − lag, query_time])sinceindefinitely)Implementation
src/sql-parser):TableFactor::Changes { relation: ChangesRelation, as_of: AsOf, .. }, whereChangesRelationisName | Query(mirrorsSubscribeRelation).AS OF [AT LEAST]clause required.src/sql/src/plan/):plan_changesresolves the relation (name, or subquery reduced to a bare globalGet) to a single persist-backed collection, classifies the bound (strict/advisory, fixed/sliding), gates by lifetime, and builds the changelog schema (input columns +mz_timestamp+mz_diff). Feature-flagged byenable_changes_table_function.src/expr,src/sql/src/plan/):MirRelationExpr::Changes { id, typ, bound: MirScalarExpr }— an opaque leaf carrying the lower-bound scalar; lowers to aGetof the source import withread_as_changelogset.src/adapter/src/optimize/peek.rs): evaluates each changelogboundat the query time (resolvingmz_now()) to pin the dataflowas_of, and marks the source imports.src/compute/src/render.rs): consolidates the source (snapshot collapses toas_of), then reinterprets each(row, time, diff)as the append((row, time, diff), max(time, as_of), 1)— append-only, matchingSUBSCRIBE's diff format.src/transform/):Changesis an opaque barrier leaf in all analyses; nothing is pushed into or through it.Verification
test/sqllogictest/changes.slt: feature gating, argument validation (persist-backed only; views/CTEs rejected), bound validation, name-and-subquery forms, durable-lifetime gating, slidingmz_now()plan+run.test/testdrive/changes.td: data round-trip — snapshot atas_ofplus later inserts/deletes appear as appends with correctmz_diff; sliding-bound and subquery forms.pack_changelog_rowunit tests.Follows the design doc (
doc/developer/design/20260602_changes_table_function.md).Follow-ups (tracked in the design doc's open questions)
AS OF AT LEAST) clamping: age the bound up to the input'ssincerather than erroring.CHANGES(generalize the subquery form beyond a bare read) — the time-invariance / optimizer-barrier work.https://claude.ai/code/session_018drjq1ZvKQgh931ix5vv1M