Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 15 additions & 7 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2625,23 +2625,31 @@ impl MetastoreService for PostgresqlMetastore {
if request.split_ids.is_empty() {
return Ok(EmptyResponse {});
}
let index_uid: IndexUid = request.index_uid().clone();

info!(
index_uid = %request.index_uid(),
split_ids = ?request.split_ids,
"deleting metrics splits"
);

// Only delete splits that are marked for deletion
// Match the non-metrics delete_splits pattern: distinguish
// "not found" (warn + succeed) from "not deletable" (FailedPrecondition).
// Match the non-metrics delete_splits pattern: CTE with FOR UPDATE
// to lock rows before reading state, avoiding stale-state races under
// concurrent mark_metrics_splits_for_deletion. Distinguishes "not found"
// (warn + succeed) from "not deletable" (FailedPrecondition).
const DELETE_SPLITS_QUERY: &str = r#"
WITH input_splits AS (
SELECT input_splits.split_id, metrics_splits.split_state
FROM UNNEST($2::text[]) AS input_splits(split_id)
LEFT JOIN metrics_splits
ON metrics_splits.index_uid = $1
AND metrics_splits.split_id = input_splits.split_id
LEFT JOIN (
SELECT split_id, split_state
FROM metrics_splits
WHERE
index_uid = $1
AND split_id = ANY($2)
FOR UPDATE
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding FOR UPDATE means your locking these rows. Is that what you want? I'm not clear why this change requires the addition of this lock.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, Claude likes this addition, saying:

The addition of FOR UPDATE in the CTE subquery correctly prevents a TOCTOU race where concurrent mark_metrics_splits_for_deletion could change split state between the CTE read and the DELETE. This matches the established pattern in the non-metrics delete_splits at line 1076-1083 of the same file. Good fix.

) AS metrics_splits
USING (split_id)
),
deleted AS (
DELETE FROM metrics_splits
Expand Down Expand Up @@ -2680,7 +2688,7 @@ impl MetastoreService for PostgresqlMetastore {
.bind(&request.split_ids)
.fetch_one(&self.connection_pool)
.await
.map_err(|sqlx_error| convert_sqlx_err(&request.index_uid().index_id, sqlx_error))?;
.map_err(|sqlx_error| convert_sqlx_err(&index_uid.index_id, sqlx_error))?;

if !not_deletable_ids.is_empty() {
let message = format!(
Expand Down
45 changes: 33 additions & 12 deletions quickwit/quickwit-opentelemetry/src/otlp/arrow_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ use std::io::Cursor;
use std::sync::Arc;

use arrow::array::{
ArrayRef, Float64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, UInt64Builder,
ArrayRef, Float64Builder, Int64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder,
UInt64Builder,
};
use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema};
use arrow::ipc::reader::StreamReader;
use arrow::ipc::writer::StreamWriter;
use quickwit_parquet_engine::timeseries_id::compute_timeseries_id;
use quickwit_proto::bytes::Bytes;
use quickwit_proto::ingest::{DocBatchV2, DocFormat};
use quickwit_proto::types::DocUid;
Expand Down Expand Up @@ -73,8 +75,9 @@ impl ArrowMetricsBatchBuilder {
}
let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect();

// Build the Arrow schema dynamically
let mut fields = Vec::with_capacity(4 + sorted_tag_keys.len());
// Build the Arrow schema dynamically.
// 5 fixed columns: metric_name, metric_type, timestamp_secs, value, timeseries_id
let mut fields = Vec::with_capacity(5 + sorted_tag_keys.len());
fields.push(Field::new(
"metric_name",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
Expand All @@ -83,6 +86,9 @@ impl ArrowMetricsBatchBuilder {
fields.push(Field::new("metric_type", DataType::UInt8, false));
fields.push(Field::new("timestamp_secs", DataType::UInt64, false));
fields.push(Field::new("value", DataType::Float64, false));
// TODO: customer could submit a timeseries_id tag, and I don't think we want to explicitly
// reserve it.
fields.push(Field::new("timeseries_id", DataType::Int64, false));

for &tag_key in &sorted_tag_keys {
fields.push(Field::new(
Comment on lines +91 to 94
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Filter reserved timeseries_id tag before schema expansion

Adding the fixed timeseries_id field here without reserving that key in tag ingestion allows an input attribute named timeseries_id to be added again in the dynamic tag loop, producing duplicate column names in one RecordBatch. This path is reachable because create_number_data_point only strips REQUIRED_FIELDS (metric_name, metric_type, timestamp_secs, value), not timeseries_id. With duplicate names, downstream schema lookups resolve only one match, so one of the two timeseries_id columns is effectively shadowed/ambiguous and query behavior becomes incorrect for those inputs.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@mattmkim mattmkim Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not impossible that a user would want to submit a timeseries_id tag, we'll probably want to deal with this at some point. i also don't think we want to explicitly reserve the timeseries_id tag. adding a TODO

Expand All @@ -100,6 +106,7 @@ impl ArrowMetricsBatchBuilder {
let mut metric_type_builder = UInt8Builder::with_capacity(num_rows);
let mut timestamp_secs_builder = UInt64Builder::with_capacity(num_rows);
let mut value_builder = Float64Builder::with_capacity(num_rows);
let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows);

let mut tag_builders: Vec<StringDictionaryBuilder<Int32Type>> = sorted_tag_keys
.iter()
Expand All @@ -122,6 +129,14 @@ impl ArrowMetricsBatchBuilder {
metric_type_builder.append_value(dp.metric_type as u8);
timestamp_secs_builder.append_value(dp.timestamp_secs);
value_builder.append_value(dp.value);
// TODO: can we not have to compute the timeseries for every point? especially with
// compaction, there may be many points with the same tags, in the same
// batch.
timeseries_id_builder.append_value(compute_timeseries_id(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment from Claude:

compute_timeseries_id is called per-row and sorts the tags each time. For batches where most rows share the same tag key set, this is redundant work. For typical batch sizes this is unlikely to be a bottleneck, but worth being aware of for large batches.

This seems valid, especially since we're storing one value per row, as compaction proceeds we'll be getting identical tag sets next to each other. Maybe it's too early to optimize this, but leaving a TODO noting we could do it later seems good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add a TODO

&dp.metric_name,
dp.metric_type as u8,
&dp.tags,
));

// Only touch builders for tags this data point has.
for (tag_key, tag_val) in &dp.tags {
Expand All @@ -145,11 +160,12 @@ impl ArrowMetricsBatchBuilder {
}
}

let mut arrays: Vec<ArrayRef> = Vec::with_capacity(4 + sorted_tag_keys.len());
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(5 + sorted_tag_keys.len());
arrays.push(Arc::new(metric_name_builder.finish()));
arrays.push(Arc::new(metric_type_builder.finish()));
arrays.push(Arc::new(timestamp_secs_builder.finish()));
arrays.push(Arc::new(value_builder.finish()));
arrays.push(Arc::new(timeseries_id_builder.finish()));

for tag_builder in &mut tag_builders {
arrays.push(Arc::new(tag_builder.finish()));
Expand Down Expand Up @@ -265,6 +281,10 @@ pub fn ipc_to_json_values(
serde_json::Value::Number(val.into())
}
}
DataType::Int64 => {
let arr = column.as_primitive::<arrow::datatypes::Int64Type>();
serde_json::Value::Number(arr.value(row_idx).into())
}
DataType::UInt64 => {
let arr = column.as_primitive::<arrow::datatypes::UInt64Type>();
serde_json::Value::Number(arr.value(row_idx).into())
Expand Down Expand Up @@ -387,8 +407,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 1);
// 4 fixed columns + 9 tag columns
assert_eq!(batch.num_columns(), 13);
// 5 fixed columns + 9 tag columns
assert_eq!(batch.num_columns(), 14);
}

#[test]
Expand Down Expand Up @@ -471,8 +491,8 @@ mod tests {
let batch = builder.finish();

assert_eq!(batch.num_rows(), 1);
// 4 fixed columns + 1 tag column (service_name)
assert_eq!(batch.num_columns(), 5);
// 5 fixed columns + 1 tag column (service_name)
assert_eq!(batch.num_columns(), 6);
}

#[test]
Expand Down Expand Up @@ -507,8 +527,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 2);
// 4 fixed + 3 tag columns (env, host, region) - sorted alphabetically
assert_eq!(batch.num_columns(), 7);
// 5 fixed + 3 tag columns (env, host, region) - sorted alphabetically
assert_eq!(batch.num_columns(), 8);

let schema = batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
Expand All @@ -519,6 +539,7 @@ mod tests {
"metric_type",
"timestamp_secs",
"value",
"timeseries_id",
"env",
"host",
"region",
Expand Down Expand Up @@ -569,8 +590,8 @@ mod tests {

let batch = builder.finish();
assert_eq!(batch.num_rows(), 3);
// 4 fixed + 2 tags (a, b)
assert_eq!(batch.num_columns(), 6);
// 5 fixed + 2 tags (a, b)
assert_eq!(batch.num_columns(), 7);

let schema = batch.schema();
let a_idx = schema.index_of("a").unwrap();
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost = { workspace = true }
quickwit-common = { workspace = true }
quickwit-dst = { workspace = true }
quickwit-proto = { workspace = true }
siphasher = { workspace = true }
sea-query = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-parquet-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod sort_fields;
pub mod split;
pub mod storage;
pub mod table_config;
pub mod timeseries_id;

#[cfg(any(test, feature = "testsuite"))]
pub mod test_helpers;
6 changes: 6 additions & 0 deletions quickwit/quickwit-parquet-engine/src/schema/fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub const SORT_ORDER: &[&str] = &[
"datacenter",
"region",
"host",
"timeseries_id",
"timestamp_secs",
];

Expand All @@ -41,6 +42,7 @@ pub enum ParquetField {
TimestampSecs,
StartTimestampSecs,
Value,
TimeseriesId,
TagService,
TagEnv,
TagDatacenter,
Expand All @@ -61,6 +63,7 @@ impl ParquetField {
Self::TimestampSecs => "timestamp_secs",
Self::StartTimestampSecs => "start_timestamp_secs",
Self::Value => "value",
Self::TimeseriesId => "timeseries_id",
Self::TagService => "tag_service",
Self::TagEnv => "tag_env",
Self::TagDatacenter => "tag_datacenter",
Expand Down Expand Up @@ -110,6 +113,8 @@ impl ParquetField {
Self::TimestampSecs | Self::StartTimestampSecs => DataType::UInt64,
// Metric value
Self::Value => DataType::Float64,
// Deterministic hash of timeseries identity columns
Self::TimeseriesId => DataType::Int64,
// Plain string for metric unit
Self::MetricUnit => DataType::Utf8,
// VARIANT type for semi-structured attributes
Expand Down Expand Up @@ -145,6 +150,7 @@ impl ParquetField {
Self::TimestampSecs,
Self::StartTimestampSecs,
Self::Value,
Self::TimeseriesId,
Self::TagService,
Self::TagEnv,
Self::TagDatacenter,
Expand Down
Loading
Loading