feat: compute deterministic timeseries_id column at ingest#6286
feat: compute deterministic timeseries_id column at ingest#6286
Conversation
…, window, TableConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… model, field lookup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wire TableConfig-driven sort order into ParquetWriter and add self-describing Parquet file metadata for compaction: - ParquetWriter::new() takes &TableConfig, resolves sort fields at construction via parse_sort_fields() + ParquetField::from_name() - sort_batch() uses resolved fields with per-column direction (ASC/DESC) - SS-1 debug_assert verification: re-sort and check identity permutation - build_compaction_key_value_metadata(): embeds sort_fields, window_start, window_duration, num_merge_ops, row_keys (base64) in Parquet kv_metadata - SS-5 verify_ss5_kv_consistency(): kv_metadata matches source struct - write_to_file_with_metadata() replaces write_to_file() - prepare_write() shared method for bytes and file paths - ParquetWriterConfig gains to_writer_properties_with_metadata() - ParquetSplitWriter passes TableConfig through - All callers in quickwit-indexing updated with TableConfig::default() - 23 storage tests pass including META-07 self-describing roundtrip Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…publish Add compaction metadata to the PostgreSQL metastore: Migration 27: - 6 new columns: window_start, window_duration_secs, sort_fields, num_merge_ops, row_keys, zonemap_regexes - Partial index idx_metrics_splits_compaction_scope on (index_uid, sort_fields, window_start) WHERE split_state = 'Published' stage_metrics_splits: - INSERT extended from 15 to 21 bind parameters for compaction columns - ON CONFLICT SET updates all compaction columns list_metrics_splits: - PgMetricsSplit construction includes compaction fields (defaults from JSON) Also fixes pre-existing compilation errors on upstream-10b-parquet-actors: - Missing StageMetricsSplitsRequestExt import - index_id vs index_uid type mismatches in publish/mark/delete - IndexUid binding (to_string() for sqlx) - ListMetricsSplitsResponseExt trait disambiguation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ublish validation Close critical gaps identified during port review: split_writer.rs: - Store table_config on ParquetSplitWriter (not just pass-through) - Compute window_start from batch time range using table_config.window_duration_secs - Populate sort_fields, window_duration_secs, parquet_files on metadata before write - Call write_to_file_with_metadata(Some(&metadata)) to embed KV metadata in Parquet - Update size_bytes after write completes metastore/mod.rs: - Add window_start and sort_fields fields to ListMetricsSplitsQuery - Add with_compaction_scope() builder method metastore/postgres/metastore.rs: - Add compaction scope filters (AND window_start = $N, AND sort_fields = $N) to list query - Add replaced_split_ids count verification in publish_metrics_splits - Bind compaction scope query parameters ingest/config.rs: - Add table_config: TableConfig field to ParquetIngestConfig Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ad code removal - file_backed_index/mod.rs: Add window_start and sort_fields filtering to metrics_split_matches_query() for compaction scope queries - writer.rs: Add test_meta07_self_describing_parquet_roundtrip test (writes compaction metadata to Parquet, reads back from cold file, verifies all fields roundtrip correctly) - fields.rs: Remove dead sort_order() method (replaced by TableConfig) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…regexes
Gap 1: Change window_duration_secs from i32 to Option<i32> in both
PgMetricsSplit and InsertableMetricsSplit. Pre-Phase-31 splits now
correctly map 0 → NULL in PostgreSQL, enabling Phase 32 compaction
queries to use `WHERE window_duration_secs IS NOT NULL` instead of
the fragile `WHERE window_duration_secs > 0`.
Gap 2: Change zonemap_regexes from String to serde_json::Value in
both structs. This maps directly to JSONB in sqlx, avoiding ambiguity
when PostgreSQL JSONB operators are used in Phase 34/35 zonemap pruning.
Gap 3: Add two missing tests:
- test_insertable_from_metadata_with_compaction_fields: verifies all 6
compaction fields round-trip through InsertableMetricsSplit
- test_insertable_from_metadata_pre_phase31_defaults: verifies pre-Phase-31
metadata produces window_duration_secs: None, zonemap_regexes: json!({})
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…! macro 11 tests covering the full metrics split lifecycle: - stage (happy path + non-existent index error) - stage upsert (ON CONFLICT update) - list by state, time range, metric name, compaction scope - publish (happy path + non-existent split error) - mark for deletion - delete (happy path + idempotent non-existent) Tests are generic and run against both file-backed and PostgreSQL backends. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- publish_metrics_splits: return NotFound (not FailedPrecondition) when staged splits don't exist - delete_metrics_splits: succeed silently (idempotent) for non-existent splits instead of returning FailedPrecondition - Tests now assert the correct error types on both backends Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Migration 27: add maturity_timestamp, delete_opstamp, node_id columns and publish_timestamp trigger to match the splits table (Paul's review) - ListMetricsSplitsQuery: adopt FilterRange<i64> for time_range (matching log-side pattern), single time_range field for both read and compaction paths, add node_id/delete_opstamp/update_timestamp/create_timestamp/ mature filters to close gaps with ListSplitsQuery - Use SplitState enum instead of stringly-typed Vec<String> for split_states - StoredMetricsSplit: add create_timestamp, node_id, delete_opstamp, maturity_timestamp so file-backed metastore can filter on them locally - File-backed filter: use FilterRange::overlaps_with() for time range and window intersection, apply all new filters matching log-side predicate - Postgres: intersection semantics for window queries, FilterRange-based SQL generation for all range filters - Fix InsertableMetricsSplit.window_duration_secs from Option<i32> to i32 - Rename two-letter variables (ws, sf, dt) throughout Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Extract duplicated invariant logic into a shared `invariants/` module within `quickwit-dst`. This is the "single source of truth" layer in the verification pyramid — used by stateright models, production debug_assert checks, and (future) Datadog metrics emission. Key changes: - `invariants/registry.rs`: InvariantId enum (20 variants) with Display - `invariants/window.rs`: shared window_start_secs(), is_valid_window_duration() - `invariants/sort.rs`: generic compare_with_null_ordering() for SS-2 - `invariants/check.rs`: check_invariant! macro wrapping debug_assert - stateright gated behind `model-checking` feature (optional dep) - quickwit-parquet-engine uses shared functions and check_invariant! Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The check_invariant! macro now always evaluates the condition — not just in debug builds. This implements Layer 4 (Production) of the verification stack: invariant checks run in release, with results forwarded to a pluggable InvariantRecorder for Datadog metrics emission. - Debug builds: panic on violation (debug_assert, Layer 3) - All builds: evaluate condition, call recorder (Layer 4) - set_invariant_recorder() wires up statsd at process startup - No recorder registered = no-op (single OnceLock load) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Emit cloudprem.pomsky.invariant.checked and .violated counters with invariant label via the metrics crate / DogStatsD exporter at process startup, completing Layer 4 of the verification stack. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This ADR contains company-specific information and should live in the private fork, not in the upstream quickwit-oss repo. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Rewrite CLAUDE.md as generic Quickwit AI development guide - Replace Quickhouse-Pomsky -> Quickwit branding across all docs - Replace "Datadog" observability references with generic "production observability" language - Remove "Husky (Datadog)" qualifier from gap docs (keep Husky citations — the blog post is public) - Generalize internal knowledge (query rate numbers, product-specific lateness guarantees) - Remove PomChi reference, private Google Doc link - Add docs/internals/UPSTREAM-CANDIDATES.md for pomsky tracking Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove all ClickHouse/ClickStack references from gap docs and ADRs (keep Prometheus, Mimir, InfluxDB, Husky as prior art) - Restore gap-005 Option C (compaction-time dedup) without ClickHouse citation - Mark /sesh-mode reference in CLAUDE.md as aspirational - Add aspirational items section to UPSTREAM-CANDIDATES.md tracking items described in docs but not yet implemented (TLA+ specs, DST, Kani, Bloodhound, performance baselines, benchmark binaries) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
UPSTREAM-CANDIDATES.md incorrectly stated TLA+ specs and Stateright models don't exist. They do (contributed in #6246): ParquetDataModel.tla, SortSchema.tla, TimeWindowedCompaction.tla, plus quickwit-dst invariants and Stateright model tests. Updated to accurately reflect that the remaining aspirational piece is the simulation infrastructure (SimClock, FaultInjector, etc.). Also removed the /sesh-mode aspirational entry — it's actively being used and the underlying specs/models are real. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prevents GSD planning artifacts from being committed to the repository. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reverts test env vars (CP_ENABLE_REVERSE_CONNECTION) and load-cloudprem-ui target — these are pomsky-specific and don't belong in upstream. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1fba6db to
9ac8674
Compare
9ac8674 to
60d859c
Compare
60d859c to
9522326
Compare
9e5c6ef to
cc4492e
Compare
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…treaming merge (#6281) * feat: enforce physical column ordering in Parquet files Sort schema columns are written first (in their configured sort order), followed by all remaining data columns in alphabetical order. This physical layout enables a two-GET streaming merge during compaction: the footer GET provides the schema and offsets, then a single streaming GET from the start of the row group delivers sort columns first — allowing the compactor to compute the global merge order before data columns arrive. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: verify input column order is actually scrambled The sanity check only asserted presence, not ordering. Now it verifies that host appears before service in the input (scrambled) which is the opposite of the sort-schema order (service before host). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: rustfmt test code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: collapse nested if to satisfy clippy::collapsible_if Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add a timeseries_id column (Int64) to the metrics Arrow batch, computed as a SipHash-2-4 of the series identity columns (metric_name, metric_type, and all tags excluding temporal/value columns). The hash uses fixed keys for cross-process determinism. The column is already declared in the metrics default sort schema (between host and timestamp_secs), so the parquet writer now automatically sorts by it and places it in the correct physical position. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The timeseries_id hash is persisted to Parquet files — any change
silently corrupts compaction and queries. Add:
- 3 pinned stability tests with hardcoded expected hash values
- 3 proptest properties (order independence, excluded tag immunity,
extra-tag discrimination) each running 256 random cases
- Boundary ambiguity test ({"ab":"c"} vs {"a":"bc"})
- Same-series-different-timestamp invariant test
- All-excluded-tags coverage (every EXCLUDED_TAGS entry verified)
- Edge cases: empty strings, unicode, 100-tag cardinality
- Module-level doc explaining the stability contract
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Mirror the CTE + FOR UPDATE pattern from delete_splits to prevent stale-state races. Without row locking, a concurrent mark_metrics_splits_for_deletion can commit between the state read and the DELETE, causing spurious FailedPrecondition errors and retry churn. The new query locks the target rows before reading their state, reports not-deletable (Staged/Published) and not-found splits separately, and only deletes when all requested splits are in MarkedForDeletion state. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
cc4492e to
946c229
Compare
9522326 to
b0344ba
Compare
alanfgates
left a comment
There was a problem hiding this comment.
One question on metadata changes, other than that lgtm.
| WHERE | ||
| index_uid = $1 | ||
| AND split_id = ANY($2) | ||
| FOR UPDATE |
There was a problem hiding this comment.
Adding FOR UPDATE means your locking these rows. Is that what you want? I'm not clear why this change requires the addition of this lock.
There was a problem hiding this comment.
FWIW, Claude likes this addition, saying:
The addition of FOR UPDATE in the CTE subquery correctly prevents a TOCTOU race where concurrent mark_metrics_splits_for_deletion could change split state between the CTE read and the DELETE. This matches the established pattern in the non-metrics delete_splits at line 1076-1083 of the same file. Good fix.
| //! implementation for `str` writes `bytes ++ [0xFF]` and for `u8` writes | ||
| //! `[byte]`; this has been stable since Rust 1.0. A pinned stability test | ||
| //! (`test_hash_stability_pinned`) will catch any regression. | ||
|
|
There was a problem hiding this comment.
Claude comments:
The stability contract relies on Rust's Hash trait implementation for str (writes bytes ++ [0xFF]) and u8 (writes [byte]). While this has been stable since Rust 1.0 and the pinned tests would catch a regression, this is not a guaranteed API contract — it's an implementation detail of the standard library. A Rust toolchain update could theoretically change it.
Recommendation: Consider hashing raw bytes directly (e.g., hasher.write(metric_name.as_bytes()); hasher.write(&[0xFF]);) rather than going through the Hash trait. This would make the stability explicit in the code rather than relying on an undocumented trait implementation. The pinned tests are a good safety net but catch the problem only after a build — explicit byte feeding prevents it.
It's not clear to me if the added safety here is worth the effort, especially considering a change to the hash implementation for str would probably break a lot of things.
| metric_type_builder.append_value(dp.metric_type as u8); | ||
| timestamp_secs_builder.append_value(dp.timestamp_secs); | ||
| value_builder.append_value(dp.value); | ||
| timeseries_id_builder.append_value(compute_timeseries_id( |
There was a problem hiding this comment.
Comment from Claude:
compute_timeseries_id is called per-row and sorts the tags each time. For batches where most rows share the same tag key set, this is redundant work. For typical batch sizes this is unlikely to be a bottleneck, but worth being aware of for large batches.
This seems valid, especially since we're storing one value per row, as compaction proceeds we'll be getting identical tag sets next to each other. Maybe it's too early to optimize this, but leaving a TODO noting we could do it later seems good.
Summary
timeseries_idcolumn (Int64) to the metrics Arrow batch, computed as a deterministic SipHash-2-4 of the series identity columnsmetric_name,metric_type, and all tags — excludes temporal columns (timestamp_secs,start_timestamp_secs,timestamp) and value columns (value, plus DDSketch components from [metrics] Support DDSketch in the parquet pipeline #6257:count,sum,min,max,flags,keys,counts)metric_name|service|env|datacenter|region|host|timeseries_id|timestamp_secs/V2), so the writer automatically sorts by it and places it in the correct physical positionTimeseriesIdvariant toParquetFieldenum and updatesSORT_ORDERDesign reference
Sorted Series Column for QW Parquet Pipeline — this PR implements the Timeseries ID component; the full Sorted Series composite key is a follow-up.
Test plan
compute_timeseries_id(determinism, exclusions, order independence, key/value non-interchangeability)quickwit-parquet-engineandquickwit-opentelemetrypass with updated column countsreorder_columnsnot introduced by this PR)🤖 Generated with Claude Code