-
Notifications
You must be signed in to change notification settings - Fork 536
feat: compute deterministic timeseries_id column at ingest #6286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2e01078
a73200b
64e4471
f846e6c
720f498
c8c0c10
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,11 +24,13 @@ use std::io::Cursor; | |
| use std::sync::Arc; | ||
|
|
||
| use arrow::array::{ | ||
| ArrayRef, Float64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, UInt64Builder, | ||
| ArrayRef, Float64Builder, Int64Builder, RecordBatch, StringDictionaryBuilder, UInt8Builder, | ||
| UInt64Builder, | ||
| }; | ||
| use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; | ||
| use arrow::ipc::reader::StreamReader; | ||
| use arrow::ipc::writer::StreamWriter; | ||
| use quickwit_parquet_engine::timeseries_id::compute_timeseries_id; | ||
| use quickwit_proto::bytes::Bytes; | ||
| use quickwit_proto::ingest::{DocBatchV2, DocFormat}; | ||
| use quickwit_proto::types::DocUid; | ||
|
|
@@ -73,8 +75,9 @@ impl ArrowMetricsBatchBuilder { | |
| } | ||
| let sorted_tag_keys: Vec<&str> = tag_keys.into_iter().collect(); | ||
|
|
||
| // Build the Arrow schema dynamically | ||
| let mut fields = Vec::with_capacity(4 + sorted_tag_keys.len()); | ||
| // Build the Arrow schema dynamically. | ||
| // 5 fixed columns: metric_name, metric_type, timestamp_secs, value, timeseries_id | ||
| let mut fields = Vec::with_capacity(5 + sorted_tag_keys.len()); | ||
| fields.push(Field::new( | ||
| "metric_name", | ||
| DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), | ||
|
|
@@ -83,6 +86,9 @@ impl ArrowMetricsBatchBuilder { | |
| fields.push(Field::new("metric_type", DataType::UInt8, false)); | ||
| fields.push(Field::new("timestamp_secs", DataType::UInt64, false)); | ||
| fields.push(Field::new("value", DataType::Float64, false)); | ||
| // TODO: customer could submit a timeseries_id tag, and I don't think we want to explicitly | ||
| // reserve it. | ||
| fields.push(Field::new("timeseries_id", DataType::Int64, false)); | ||
|
|
||
| for &tag_key in &sorted_tag_keys { | ||
| fields.push(Field::new( | ||
|
Comment on lines
+91
to
94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Adding the fixed Useful? React with 👍 / 👎.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not impossible that a user would want to submit a |
||
|
|
@@ -100,6 +106,7 @@ impl ArrowMetricsBatchBuilder { | |
| let mut metric_type_builder = UInt8Builder::with_capacity(num_rows); | ||
| let mut timestamp_secs_builder = UInt64Builder::with_capacity(num_rows); | ||
| let mut value_builder = Float64Builder::with_capacity(num_rows); | ||
| let mut timeseries_id_builder = Int64Builder::with_capacity(num_rows); | ||
|
|
||
| let mut tag_builders: Vec<StringDictionaryBuilder<Int32Type>> = sorted_tag_keys | ||
| .iter() | ||
|
|
@@ -122,6 +129,14 @@ impl ArrowMetricsBatchBuilder { | |
| metric_type_builder.append_value(dp.metric_type as u8); | ||
| timestamp_secs_builder.append_value(dp.timestamp_secs); | ||
| value_builder.append_value(dp.value); | ||
| // TODO: can we not have to compute the timeseries for every point? especially with | ||
| // compaction, there may be many points with the same tags, in the same | ||
| // batch. | ||
| timeseries_id_builder.append_value(compute_timeseries_id( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment from Claude:
This seems valid, especially since we're storing one value per row, as compaction proceeds we'll be getting identical tag sets next to each other. Maybe it's too early to optimize this, but leaving a TODO noting we could do it later seems good.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will add a TODO |
||
| &dp.metric_name, | ||
| dp.metric_type as u8, | ||
| &dp.tags, | ||
| )); | ||
|
|
||
| // Only touch builders for tags this data point has. | ||
| for (tag_key, tag_val) in &dp.tags { | ||
|
|
@@ -145,11 +160,12 @@ impl ArrowMetricsBatchBuilder { | |
| } | ||
| } | ||
|
|
||
| let mut arrays: Vec<ArrayRef> = Vec::with_capacity(4 + sorted_tag_keys.len()); | ||
| let mut arrays: Vec<ArrayRef> = Vec::with_capacity(5 + sorted_tag_keys.len()); | ||
| arrays.push(Arc::new(metric_name_builder.finish())); | ||
| arrays.push(Arc::new(metric_type_builder.finish())); | ||
| arrays.push(Arc::new(timestamp_secs_builder.finish())); | ||
| arrays.push(Arc::new(value_builder.finish())); | ||
| arrays.push(Arc::new(timeseries_id_builder.finish())); | ||
|
|
||
| for tag_builder in &mut tag_builders { | ||
| arrays.push(Arc::new(tag_builder.finish())); | ||
|
|
@@ -265,6 +281,10 @@ pub fn ipc_to_json_values( | |
| serde_json::Value::Number(val.into()) | ||
| } | ||
| } | ||
| DataType::Int64 => { | ||
| let arr = column.as_primitive::<arrow::datatypes::Int64Type>(); | ||
| serde_json::Value::Number(arr.value(row_idx).into()) | ||
| } | ||
| DataType::UInt64 => { | ||
| let arr = column.as_primitive::<arrow::datatypes::UInt64Type>(); | ||
| serde_json::Value::Number(arr.value(row_idx).into()) | ||
|
|
@@ -387,8 +407,8 @@ mod tests { | |
|
|
||
| let batch = builder.finish(); | ||
| assert_eq!(batch.num_rows(), 1); | ||
| // 4 fixed columns + 9 tag columns | ||
| assert_eq!(batch.num_columns(), 13); | ||
| // 5 fixed columns + 9 tag columns | ||
| assert_eq!(batch.num_columns(), 14); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -471,8 +491,8 @@ mod tests { | |
| let batch = builder.finish(); | ||
|
|
||
| assert_eq!(batch.num_rows(), 1); | ||
| // 4 fixed columns + 1 tag column (service_name) | ||
| assert_eq!(batch.num_columns(), 5); | ||
| // 5 fixed columns + 1 tag column (service_name) | ||
| assert_eq!(batch.num_columns(), 6); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -507,8 +527,8 @@ mod tests { | |
|
|
||
| let batch = builder.finish(); | ||
| assert_eq!(batch.num_rows(), 2); | ||
| // 4 fixed + 3 tag columns (env, host, region) - sorted alphabetically | ||
| assert_eq!(batch.num_columns(), 7); | ||
| // 5 fixed + 3 tag columns (env, host, region) - sorted alphabetically | ||
| assert_eq!(batch.num_columns(), 8); | ||
|
|
||
| let schema = batch.schema(); | ||
| let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); | ||
|
|
@@ -519,6 +539,7 @@ mod tests { | |
| "metric_type", | ||
| "timestamp_secs", | ||
| "value", | ||
| "timeseries_id", | ||
| "env", | ||
| "host", | ||
| "region", | ||
|
|
@@ -569,8 +590,8 @@ mod tests { | |
|
|
||
| let batch = builder.finish(); | ||
| assert_eq!(batch.num_rows(), 3); | ||
| // 4 fixed + 2 tags (a, b) | ||
| assert_eq!(batch.num_columns(), 6); | ||
| // 5 fixed + 2 tags (a, b) | ||
| assert_eq!(batch.num_columns(), 7); | ||
|
|
||
| let schema = batch.schema(); | ||
| let a_idx = schema.index_of("a").unwrap(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding
FOR UPDATEmeans your locking these rows. Is that what you want? I'm not clear why this change requires the addition of this lock.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, Claude likes this addition, saying: