Skip to content

feat: extract and populate RowKeys from sorted batches#6292

Open
g-talbot wants to merge 93 commits intogtt/sorted-series-keyfrom
gtt/row-keys-extraction
Open

feat: extract and populate RowKeys from sorted batches#6292
g-talbot wants to merge 93 commits intogtt/sorted-series-keyfrom
gtt/row-keys-extraction

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

Summary

  • New row_keys module: extract_row_keys() reads sort schema column values at the first and last rows of a sorted batch, building a sortschema::RowKeys proto with min_row_values, max_row_values, and all_inclusive_max_row_values
  • prepare_write() now computes RowKeys after sorting and injects qh.row_keys (base64) + qh.row_keys_json into Parquet KV metadata
  • write_to_bytes / write_to_file_with_metadata return the RowKeys proto bytes alongside the write result
  • split_writer captures the returned bytes and stores them on MetricsSplitMetadata.row_keys_proto
  • Column type mapping: Dictionary(Int32,Utf8) / Utf8TypeString, Int64TypeInt, UInt64TypeInt (cast), Float64TypeFloat; null values → ColumnValue { value: None }

Stacked on #6290 (sorted_series column).

Test plan

  • 11 row_keys tests: empty batch, single row min==max, multi-row first/last, null encoding, missing columns, column count, proto round-trip (with and without nulls), Parquet KV metadata round-trip, split_writer integration
  • All 209 quickwit-parquet-engine tests pass
  • Clippy clean, formatted, license headers pass

🤖 Generated with Claude Code

mattmkim and others added 30 commits March 30, 2026 14:21
…, window, TableConfig

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… model, field lookup

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire TableConfig-driven sort order into ParquetWriter and add
self-describing Parquet file metadata for compaction:

- ParquetWriter::new() takes &TableConfig, resolves sort fields at
  construction via parse_sort_fields() + ParquetField::from_name()
- sort_batch() uses resolved fields with per-column direction (ASC/DESC)
- SS-1 debug_assert verification: re-sort and check identity permutation
- build_compaction_key_value_metadata(): embeds sort_fields, window_start,
  window_duration, num_merge_ops, row_keys (base64) in Parquet kv_metadata
- SS-5 verify_ss5_kv_consistency(): kv_metadata matches source struct
- write_to_file_with_metadata() replaces write_to_file()
- prepare_write() shared method for bytes and file paths
- ParquetWriterConfig gains to_writer_properties_with_metadata()
- ParquetSplitWriter passes TableConfig through
- All callers in quickwit-indexing updated with TableConfig::default()
- 23 storage tests pass including META-07 self-describing roundtrip

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…publish

Add compaction metadata to the PostgreSQL metastore:

Migration 27:
- 6 new columns: window_start, window_duration_secs, sort_fields,
  num_merge_ops, row_keys, zonemap_regexes
- Partial index idx_metrics_splits_compaction_scope on
  (index_uid, sort_fields, window_start) WHERE split_state = 'Published'

stage_metrics_splits:
- INSERT extended from 15 to 21 bind parameters for compaction columns
- ON CONFLICT SET updates all compaction columns

list_metrics_splits:
- PgMetricsSplit construction includes compaction fields (defaults from JSON)

Also fixes pre-existing compilation errors on upstream-10b-parquet-actors:
- Missing StageMetricsSplitsRequestExt import
- index_id vs index_uid type mismatches in publish/mark/delete
- IndexUid binding (to_string() for sqlx)
- ListMetricsSplitsResponseExt trait disambiguation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ublish validation

Close critical gaps identified during port review:

split_writer.rs:
- Store table_config on ParquetSplitWriter (not just pass-through)
- Compute window_start from batch time range using table_config.window_duration_secs
- Populate sort_fields, window_duration_secs, parquet_files on metadata before write
- Call write_to_file_with_metadata(Some(&metadata)) to embed KV metadata in Parquet
- Update size_bytes after write completes

metastore/mod.rs:
- Add window_start and sort_fields fields to ListMetricsSplitsQuery
- Add with_compaction_scope() builder method

metastore/postgres/metastore.rs:
- Add compaction scope filters (AND window_start = $N, AND sort_fields = $N) to list query
- Add replaced_split_ids count verification in publish_metrics_splits
- Bind compaction scope query parameters

ingest/config.rs:
- Add table_config: TableConfig field to ParquetIngestConfig

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ad code removal

- file_backed_index/mod.rs: Add window_start and sort_fields filtering
  to metrics_split_matches_query() for compaction scope queries
- writer.rs: Add test_meta07_self_describing_parquet_roundtrip test
  (writes compaction metadata to Parquet, reads back from cold file,
  verifies all fields roundtrip correctly)
- fields.rs: Remove dead sort_order() method (replaced by TableConfig)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…regexes

Gap 1: Change window_duration_secs from i32 to Option<i32> in both
PgMetricsSplit and InsertableMetricsSplit. Pre-Phase-31 splits now
correctly map 0 → NULL in PostgreSQL, enabling Phase 32 compaction
queries to use `WHERE window_duration_secs IS NOT NULL` instead of
the fragile `WHERE window_duration_secs > 0`.

Gap 2: Change zonemap_regexes from String to serde_json::Value in
both structs. This maps directly to JSONB in sqlx, avoiding ambiguity
when PostgreSQL JSONB operators are used in Phase 34/35 zonemap pruning.

Gap 3: Add two missing tests:
- test_insertable_from_metadata_with_compaction_fields: verifies all 6
  compaction fields round-trip through InsertableMetricsSplit
- test_insertable_from_metadata_pre_phase31_defaults: verifies pre-Phase-31
  metadata produces window_duration_secs: None, zonemap_regexes: json!({})

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…! macro

11 tests covering the full metrics split lifecycle:
- stage (happy path + non-existent index error)
- stage upsert (ON CONFLICT update)
- list by state, time range, metric name, compaction scope
- publish (happy path + non-existent split error)
- mark for deletion
- delete (happy path + idempotent non-existent)

Tests are generic and run against both file-backed and PostgreSQL backends.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- publish_metrics_splits: return NotFound (not FailedPrecondition) when
  staged splits don't exist
- delete_metrics_splits: succeed silently (idempotent) for non-existent
  splits instead of returning FailedPrecondition
- Tests now assert the correct error types on both backends

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Migration 27: add maturity_timestamp, delete_opstamp, node_id columns
  and publish_timestamp trigger to match the splits table (Paul's review)
- ListMetricsSplitsQuery: adopt FilterRange<i64> for time_range (matching
  log-side pattern), single time_range field for both read and compaction
  paths, add node_id/delete_opstamp/update_timestamp/create_timestamp/
  mature filters to close gaps with ListSplitsQuery
- Use SplitState enum instead of stringly-typed Vec<String> for split_states
- StoredMetricsSplit: add create_timestamp, node_id, delete_opstamp,
  maturity_timestamp so file-backed metastore can filter on them locally
- File-backed filter: use FilterRange::overlaps_with() for time range and
  window intersection, apply all new filters matching log-side predicate
- Postgres: intersection semantics for window queries, FilterRange-based
  SQL generation for all range filters
- Fix InsertableMetricsSplit.window_duration_secs from Option<i32> to i32
- Rename two-letter variables (ws, sf, dt) throughout

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Extract duplicated invariant logic into a shared `invariants/` module
within `quickwit-dst`. This is the "single source of truth" layer in
the verification pyramid — used by stateright models, production
debug_assert checks, and (future) Datadog metrics emission.

Key changes:
- `invariants/registry.rs`: InvariantId enum (20 variants) with Display
- `invariants/window.rs`: shared window_start_secs(), is_valid_window_duration()
- `invariants/sort.rs`: generic compare_with_null_ordering() for SS-2
- `invariants/check.rs`: check_invariant! macro wrapping debug_assert
- stateright gated behind `model-checking` feature (optional dep)
- quickwit-parquet-engine uses shared functions and check_invariant!

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The check_invariant! macro now always evaluates the condition — not just
in debug builds. This implements Layer 4 (Production) of the verification
stack: invariant checks run in release, with results forwarded to a
pluggable InvariantRecorder for Datadog metrics emission.

- Debug builds: panic on violation (debug_assert, Layer 3)
- All builds: evaluate condition, call recorder (Layer 4)
- set_invariant_recorder() wires up statsd at process startup
- No recorder registered = no-op (single OnceLock load)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Emit cloudprem.pomsky.invariant.checked and .violated counters with
invariant label via the metrics crate / DogStatsD exporter at process
startup, completing Layer 4 of the verification stack.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
g-talbot and others added 25 commits April 8, 2026 15:02
Reverts c8bf8d7, cafcac5, a088f53 — these are code changes
(delete_metrics_splits error handling, doc comment tweaks) that
don't belong in a docs-only PR. They will land in a separate PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This ADR contains company-specific information and should live
in the private fork, not in the upstream quickwit-oss repo.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Rewrite CLAUDE.md as generic Quickwit AI development guide
- Replace Quickhouse-Pomsky -> Quickwit branding across all docs
- Replace "Datadog" observability references with generic
  "production observability" language
- Remove "Husky (Datadog)" qualifier from gap docs (keep Husky
  citations — the blog post is public)
- Generalize internal knowledge (query rate numbers, product-specific
  lateness guarantees)
- Remove PomChi reference, private Google Doc link
- Add docs/internals/UPSTREAM-CANDIDATES.md for pomsky tracking

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove all ClickHouse/ClickStack references from gap docs and ADRs
  (keep Prometheus, Mimir, InfluxDB, Husky as prior art)
- Restore gap-005 Option C (compaction-time dedup) without ClickHouse citation
- Mark /sesh-mode reference in CLAUDE.md as aspirational
- Add aspirational items section to UPSTREAM-CANDIDATES.md tracking
  items described in docs but not yet implemented (TLA+ specs, DST,
  Kani, Bloodhound, performance baselines, benchmark binaries)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
UPSTREAM-CANDIDATES.md incorrectly stated TLA+ specs and Stateright
models don't exist. They do (contributed in #6246): ParquetDataModel.tla,
SortSchema.tla, TimeWindowedCompaction.tla, plus quickwit-dst invariants
and Stateright model tests. Updated to accurately reflect that the
remaining aspirational piece is the simulation infrastructure (SimClock,
FaultInjector, etc.).

Also removed the /sesh-mode aspirational entry — it's actively being
used and the underlying specs/models are real.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prevents GSD planning artifacts from being committed to the repository.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reverts test env vars (CP_ENABLE_REVERSE_CONNECTION) and
load-cloudprem-ui target — these are pomsky-specific and
don't belong in upstream.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…treaming merge (#6281)

* feat: enforce physical column ordering in Parquet files

Sort schema columns are written first (in their configured sort order),
followed by all remaining data columns in alphabetical order. This
physical layout enables a two-GET streaming merge during compaction:
the footer GET provides the schema and offsets, then a single streaming
GET from the start of the row group delivers sort columns first —
allowing the compactor to compute the global merge order before data
columns arrive.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: verify input column order is actually scrambled

The sanity check only asserted presence, not ordering. Now it
verifies that host appears before service in the input (scrambled)
which is the opposite of the sort-schema order (service before host).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: rustfmt test code

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: collapse nested if to satisfy clippy::collapsible_if

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add a timeseries_id column (Int64) to the metrics Arrow batch,
computed as a SipHash-2-4 of the series identity columns (metric_name,
metric_type, and all tags excluding temporal/value columns). The hash
uses fixed keys for cross-process determinism.

The column is already declared in the metrics default sort schema
(between host and timestamp_secs), so the parquet writer now
automatically sorts by it and places it in the correct physical
position.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The timeseries_id hash is persisted to Parquet files — any change
silently corrupts compaction and queries. Add:

- 3 pinned stability tests with hardcoded expected hash values
- 3 proptest properties (order independence, excluded tag immunity,
  extra-tag discrimination) each running 256 random cases
- Boundary ambiguity test ({"ab":"c"} vs {"a":"bc"})
- Same-series-different-timestamp invariant test
- All-excluded-tags coverage (every EXCLUDED_TAGS entry verified)
- Edge cases: empty strings, unicode, 100-tag cardinality
- Module-level doc explaining the stability contract

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Mirror the CTE + FOR UPDATE pattern from delete_splits to prevent
stale-state races. Without row locking, a concurrent
mark_metrics_splits_for_deletion can commit between the state read
and the DELETE, causing spurious FailedPrecondition errors and retry
churn.

The new query locks the target rows before reading their state,
reports not-deletable (Staged/Published) and not-found splits
separately, and only deletes when all requested splits are in
MarkedForDeletion state.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Compute a composite, lexicographically sortable binary column
(sorted_series) at Parquet write time using storekey order-preserving
encoding. For each row the key encodes:

  1. Non-null sort schema tag columns as (ordinal: u8, value: str)
  2. timeseries_id (i64) as final discriminator

Identical timeseries always produce identical byte keys regardless of
timestamp or value, enabling DataFusion's streaming AggregateExec and
BoundedWindowAggExec with O(1) memory instead of O(N) hash tables.

Also fixes create_nullable_dict_array which used the original array
index as dictionary key instead of the position in the unique values
array, causing out-of-bounds panics for mixed null/non-null inputs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Without the ordinal, the timeseries_id bytes could collide with a
subsequent tag column's ordinal+string encoding. Every component in
the key now consistently gets an ordinal prefix from its sort schema
position.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add tests that assert:
- timeseries_id gets ordinal 6 prefix (its sort schema position)
- key length is exact: ordinal(1) + str(2) + ordinal(1) + i64(8) = 12
- when timeseries_id is absent, no trailing ordinal appears

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Writes a 6-row batch with 4 distinct series (including null tags)
through the ParquetWriter pipeline, reads back, and verifies:

- 4 distinct keys produced (series identity)
- series with 3 rows produces 3 identical keys
- null host differs from present host (ordinal skipping)
- all-null tags differ from partial-null tags
- ordinal bytes are correct (0x00 for metric_name, 0x01 for service,
  0x06 for timeseries_id) even when intermediate tags are null
- equal keys are contiguous after sort (streaming aggregation ready)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Regenerate storekey entry via dd-rust-license-tool (correct authors)
- Fix 4 rustfmt nightly formatting diffs in sorted_series tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Compute sort-key boundaries (first/last row values) during Parquet
write and store them as RowKeys proto in both the Parquet KV metadata
and MetricsSplitMetadata. The compactor uses these ranges to determine
key-range overlap between splits and to select merge boundaries.

- New row_keys module: extract_row_keys() reads sort schema column
  values at rows 0 and N-1, mapping Arrow types to ColumnValue proto
- prepare_write() now computes row_keys after sorting and injects
  qh.row_keys (base64) + qh.row_keys_json into Parquet KV metadata
- write_to_bytes/write_to_file_with_metadata return row_keys proto
  bytes alongside the write result
- split_writer captures and stores row_keys_proto on metadata

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
End-to-end test with 8 rows across 4 series (including null tags,
mixed metrics, wide timestamp range). Writes through the full
split_writer pipeline and verifies the RowKeys proto is byte-identical
in all three places:

1. MetricsSplitMetadata.row_keys_proto (feeds Postgres)
2. Parquet file KV metadata (qh.row_keys, base64-encoded)
3. InsertableMetricsSplit.row_keys (Postgres column)

Also checks: min/max column values match expected sort order,
all_inclusive_max equals max, JSON metadata is parseable, and
the split is not marked expired.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot requested a review from mattmkim April 10, 2026 20:59
@mattmkim
Copy link
Copy Markdown
Contributor

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1b444d2da2

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +65 to +67
let batch_idx = match batch_schema.index_of(&col_def.name) {
Ok(idx) => idx,
Err(_) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Map timestamp sort field to timestamp_secs

When sort_fields uses the legacy timestamp token (for example ...|timestamp/V2), this lookup uses col_def.name verbatim against the Arrow schema, which only contains timestamp_secs. In that case index_of fails and the code encodes None for the timestamp boundary, so the generated RowKeys lose their time range and can make overlap/range decisions incorrect for indices configured with timestamp. Normalize timestamp to timestamp_secs before schema lookup (as done in sorted_series).

Useful? React with 👍 / 👎.

Comment on lines +398 to +402
if let Some(ref rk_bytes) = row_keys_proto {
kv_entries.push(KeyValue::new(
PARQUET_META_ROW_KEYS.to_string(),
BASE64.encode(rk_bytes),
));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid appending duplicate qh.row_keys entries

kv_entries is seeded from build_compaction_key_value_metadata(split_metadata), which already includes qh.row_keys when split_metadata.row_keys_proto is present, and this block appends another qh.row_keys unconditionally from computed bytes. That produces duplicate metadata keys for non-empty batches with pre-populated metadata, and different readers may resolve first/last duplicate differently, yielding inconsistent recovered RowKeys. Update/replace the existing key instead of always pushing a new one.

Useful? React with 👍 / 👎.

@mattmkim mattmkim force-pushed the gtt/sorted-series-key branch from 652d128 to 069a5a1 Compare April 15, 2026 22:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants