From e60f49c1685efd0cba074b57390ee1f827b101f4 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 29 May 2026 11:09:08 +0530 Subject: [PATCH 1/5] cli var for dataset stats --- src/cli.rs | 9 +++++++++ src/storage/object_storage.rs | 6 ++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index d6664ff2e..ef30ce68f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -559,6 +559,15 @@ pub struct Options { )] pub max_field_statistics: usize, + // maximum limit to store the statistics for a field + #[arg( + long, + env = "P_CALCULATE_FIELD_STATISTICS", + default_value = "true", + help = "Maximum number of field statistics to store" + )] + pub calculate_field_statistics: bool, + #[arg( long, env = "P_MAX_EVENT_PAYLOAD_SIZE", diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b67cf18f9..560e0ff2c 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -184,8 +184,10 @@ async fn upload_single_parquet_file( let manifest = catalog::create_from_parquet_file(absolute_path, &path) .map_err(|e| (path.clone(), ObjectStorageError::from(e)))?; - // Calculate field stats if enabled - calculate_stats_if_enabled(&stream_name, &path, &schema, tenant_id).await; + if PARSEABLE.options.calculate_field_statistics { + // Calculate field stats if enabled + calculate_stats_if_enabled(&stream_name, &path, &schema, tenant_id).await; + } Ok(UploadResult { file_path: path, From d4365120977b2704b35c0bbd4b0491f6a6159c52 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Mon, 1 Jun 2026 19:06:32 +0530 Subject: [PATCH 2/5] perf: batch staging arrow writes Batch disk staging writes per output filename before writing to Arrow IPC. This reduces per-request writer mutex hold time and cuts the number of DiskWriter::write calls during high-volume OTEL metrics ingest. Also keep targeted hotpath probes around ingest, JSON conversion, staging, and parquet conversion paths, and skip object-store sync work for streams without local parquet/schema files. --- Cargo.toml | 8 ++ src/event/format/json.rs | 7 +- src/event/format/mod.rs | 23 ++++++ src/event/mod.rs | 1 + src/handlers/http/modal/utils/ingest_utils.rs | 1 + src/lib.rs | 2 +- src/otel/metrics.rs | 1 + src/parseable/staging/writer.rs | 76 ++++++++++++++++++- src/parseable/streams.rs | 60 +++++++-------- src/storage/object_storage.rs | 6 ++ src/utils/json/flatten.rs | 1 + src/utils/json/mod.rs | 3 + 12 files changed, 152 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eba916ec9..2a389776b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,14 @@ tokio = { version = "^1.43", default-features = false, features = [ tokio-stream = { version = "0.1.17", features = ["fs"] } tokio-util = { version = "0.7" } +# perf +hotpath = { version = "0.16.0", features = [ + "hotpath", + "hotpath-cpu", + "hotpath-alloc", + "tokio" +] } + # # Logging and Metrics # opentelemetry-proto = { version = "0.30.0", features = [ # "gen-tonic", diff --git a/src/event/format/json.rs b/src/event/format/json.rs index cc4ca90f3..90a560022 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -60,6 +60,7 @@ impl EventFormat for Event { // convert the incoming json to a vector of json values // also extract the arrow schema, tags and metadata from the incoming json + #[hotpath::measure] fn to_data( self, schema: &HashMap>, @@ -156,7 +157,7 @@ impl EventFormat for Event { infer_schema.clone(), ]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?; is_first = true; - let schema = infer_schema + let schema: Vec> = infer_schema .fields .iter() .filter(|field| !field.data_type().is_null()) @@ -327,6 +328,10 @@ fn rename_json_keys(values: Vec) -> Result, anyhow::Error> { .into_iter() .map(|value| { if let Value::Object(map) = value { + if !map.keys().any(|key| key.starts_with('@')) { + return Ok(Value::Object(map)); + } + // Collect original keys to check for collisions let original_keys: HashSet = map.keys().cloned().collect(); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 5d8419a79..e3457016d 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -162,6 +162,7 @@ pub trait EventFormat: Sized { /// Returns the UTC time at ingestion fn get_p_timestamp(&self) -> DateTime; + #[hotpath::measure] fn into_recordbatch( self, storage_schema: &HashMap>, @@ -606,6 +607,28 @@ pub fn rename_per_record_type_mismatches( let Value::Object(map) = value else { return value; }; + let needs_rename = map.iter().any(|(key, val)| { + if val.is_null() { + return false; + } + let target_type = existing_schema + .get(key) + .map(|f| f.data_type()) + .or_else(|| inferred_types.get(key.as_str()).copied()); + let Some(target_type) = target_type else { + return false; + }; + if (val.is_array() || val.is_object()) + && (target_type.is_list() + || matches!(target_type, DataType::Struct(_) | DataType::Map(_, _))) + { + return false; + } + !value_compatible_with_type(val, target_type, schema_version) + }); + if !needs_rename { + return Value::Object(map); + } let new_map: serde_json::Map = map .into_iter() .map(|(key, val)| { diff --git a/src/event/mod.rs b/src/event/mod.rs index f9955e986..8f0dfebff 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -72,6 +72,7 @@ impl Event { is_first_event = self.is_first_event ) )] + #[hotpath::measure] pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index b353f1b29..3f3eeed15 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -156,6 +156,7 @@ pub async fn flatten_and_push_logs( skip(json, log_source, p_custom_fields, time_partition, telemetry_type, tenant_id), fields(stream_name, record_count = tracing::field::Empty) )] +#[hotpath::measure] pub fn push_logs( stream_name: &str, json: Value, diff --git a/src/lib.rs b/src/lib.rs index de7ee4a5f..036cba99c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,8 +66,8 @@ use once_cell::sync::Lazy; pub use openid; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; +pub use {hotpath, tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; -pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index c6f017c28..09ed31f3f 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -640,6 +640,7 @@ fn process_resource_metrics( /// this function performs the custom flattening of the otel metrics /// and returns a `Vec` of `Value::Object` of the flattened json +#[hotpath::measure] pub fn flatten_otel_metrics(message: MetricsData, tenant_id: &str) -> Vec { process_resource_metrics( &message.resource_metrics, diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index ec58800a3..ba3a8fb9f 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -19,7 +19,7 @@ use std::{ collections::{HashMap, HashSet}, - fs::{File, OpenOptions}, + fs::{self, File, OpenOptions}, io::BufWriter, path::PathBuf, sync::Arc, @@ -41,10 +41,83 @@ use crate::{ use super::StagingError; +const DISK_WRITE_BATCH_ROWS: usize = 16_384; + #[derive(Default)] pub struct Writer { pub mem: MemWriter<16384>, pub disk: HashMap, + disk_pending: HashMap, +} + +impl Writer { + #[hotpath::measure] + pub fn push_disk( + &mut self, + filename: String, + rb: &RecordBatch, + file_path: PathBuf, + range: TimeRange, + ) -> Result<(), StagingError> { + let pending = self.disk_pending.entry(filename.clone()).or_default(); + pending.rows += rb.num_rows(); + pending.range.get_or_insert(range); + pending.batches.push(rb.clone()); + + if pending.rows >= DISK_WRITE_BATCH_ROWS { + self.flush_pending_disk(&filename, file_path)?; + } + + Ok(()) + } + + pub fn flush_pending_disk( + &mut self, + filename: &str, + file_path: PathBuf, + ) -> Result<(), StagingError> { + let Some(pending) = self.disk_pending.remove(filename) else { + return Ok(()); + }; + if pending.batches.is_empty() { + return Ok(()); + } + + let schema = pending.batches[0].schema(); + let batch = concat_batches(&schema, pending.batches.iter())?; + match self.disk.get_mut(filename) { + Some(writer) => writer.write(&batch)?, + None => { + let range = pending.range.expect("pending disk batch must have range"); + if let Some(parent) = file_path.parent() { + fs::create_dir_all(parent)?; + } + let mut writer = DiskWriter::try_new(file_path, &schema, range)?; + writer.write(&batch)?; + self.disk.insert(filename.to_owned(), writer); + } + } + + Ok(()) + } + + pub fn flush_all_pending_disk( + &mut self, + data_path: &std::path::Path, + ) -> Result<(), StagingError> { + let filenames = self.disk_pending.keys().cloned().collect_vec(); + for filename in filenames { + self.flush_pending_disk(&filename, data_path.join(&filename))?; + } + Ok(()) + } +} + +#[derive(Default)] +struct PendingDiskBatch { + rows: usize, + batches: Vec, + range: Option, } pub struct DiskWriter { @@ -77,6 +150,7 @@ impl DiskWriter { } /// Write a single recordbatch into file + #[hotpath::measure] pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { self.inner.write(rb).map_err(StagingError::Arrow) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 8c1e55115..5c7c75984 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -43,7 +43,7 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; use tokio::task::JoinSet; -use tracing::{Instrument, error, info, info_span, instrument, trace, warn}; +use tracing::{error, info, info_span, instrument, trace, warn}; use ulid::Ulid; use crate::{ @@ -65,11 +65,7 @@ use crate::{ use super::{ ARROW_FILE_EXTENSION, LogStream, PART_FILE_EXTENSION, - staging::{ - StagingError, - reader::MergedReverseRecordReader, - writer::{DiskWriter, Writer}, - }, + staging::{StagingError, reader::MergedReverseRecordReader, writer::Writer}, }; const INPROCESS_DIR_PREFIX: &str = "processing_"; @@ -115,7 +111,6 @@ pub struct Stream { pub writer: Mutex, pub ingestor_id: Option, } - impl Stream { pub fn new( options: Arc, @@ -138,6 +133,7 @@ impl Stream { } // Concatenates record batches and puts them in memory store for each event. + #[hotpath::measure] pub fn push( &self, schema_key: &str, @@ -173,25 +169,13 @@ impl Stream { if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { - Some(writer) => { - writer.write(record)?; - } - None => { - // entry is not present thus we create it - std::fs::create_dir_all(&self.data_path)?; - - let range = TimeRange::granularity_range( - parsed_timestamp.and_local_timezone(Utc).unwrap(), - OBJECT_STORE_DATA_GRANULARITY, - ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)?; + let range = TimeRange::granularity_range( + parsed_timestamp.and_local_timezone(Utc).unwrap(), + OBJECT_STORE_DATA_GRANULARITY, + ); + let file_path = self.data_path.join(&filename); - writer.write(record)?; - guard.disk.insert(filename, writer); - } - }; + guard.push_disk(filename, record, file_path, range)?; } guard.mem.push(schema_key, record)?; @@ -420,6 +404,7 @@ impl Stream { base.join(format!("{INPROCESS_DIR_PREFIX}{minute}")) } + #[hotpath::measure] pub fn parquet_files(&self) -> Vec { let Ok(dir) = self.data_path.read_dir() else { return vec![]; @@ -480,6 +465,7 @@ impl Stream { skip(self, tenant_id), fields(stream_name = %self.stream_name) )] + #[hotpath::measure] pub fn prepare_parquet( &self, init_signal: bool, @@ -559,6 +545,7 @@ impl Stream { Ok(()) } + #[hotpath::measure] pub fn flush(&self, forced: bool) -> Result<(), StagingError> { let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered(); // Swap out stale writers under the lock, drop them after releasing it. @@ -571,6 +558,7 @@ impl Stream { self.stream_name, poisoned ))) })?; + writer.flush_all_pending_disk(&self.data_path)?; writer.mem.clear(); let mut old_disk = HashMap::new(); @@ -669,6 +657,7 @@ impl Stream { /// Bails out without sorting when either source column is missing /// (non-metric stream, schema drift) so the caller can write the /// batch unchanged. + #[hotpath::measure] fn sort_batch_for_metric_pruning( batch: &RecordBatch, time_partition_field: &str, @@ -750,6 +739,7 @@ impl Stream { /// This function reads arrow files, groups their schemas /// /// converts them into parquet files and returns a merged schema + #[hotpath::measure] pub fn convert_disk_files_to_parquet( &self, time_partition: Option<&String>, @@ -806,7 +796,6 @@ impl Stream { self.cleanup_arrow_files_and_dir(&arrow_files, tenant_id); } } - if schemas.is_empty() { return Ok(None); } @@ -814,6 +803,7 @@ impl Stream { Ok(Some(Schema::try_merge(schemas)?)) } + #[hotpath::measure] fn write_parquet_part_file( &self, part_path: &Path, @@ -847,10 +837,11 @@ impl Stream { // per-page (metric_name min, max) stats narrow to the slice // each page actually carries. let target = self.options.row_group_size; - let mut buffer: Vec = Vec::new(); + let mut buffer: Vec = Vec::with_capacity(record_reader.readers.len()); let mut buffered_rows: usize = 0; for record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { - buffered_rows += record.num_rows(); + let record_rows = record.num_rows(); + buffered_rows += record_rows; buffer.push(record); if buffered_rows >= target { let combined = arrow::compute::concat_batches(schema, &buffer)?; @@ -886,6 +877,7 @@ impl Stream { } /// function to validate parquet files + #[hotpath::measure] fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool { // First check file size as a quick validation match path.metadata() { @@ -928,6 +920,7 @@ impl Stream { } } + #[hotpath::measure] fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf], tenant_id: &Option) { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); for (i, file) in arrow_files.iter().enumerate() { @@ -1311,6 +1304,7 @@ impl Stream { skip(self, tenant_id), fields(stream_name = %self.stream_name) )] + #[hotpath::measure] pub fn flush_and_convert( &self, init_signal: bool, @@ -1481,12 +1475,10 @@ impl Streams { for stream in streams { let tenant = tenant_id.clone(); let span = info_span!("stream_sync", stream_name = %stream.stream_name); - joinset.spawn( - async move { - stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) - } - .instrument(span), - ); + joinset.spawn_blocking(move || { + let _guard = span.enter(); + stream.flush_and_convert(init_signal, shutdown_signal, &Some(tenant)) + }); } } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 560e0ff2c..4bc0b0dd6 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1295,6 +1295,12 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { }; for tenant_id in tenants { for stream_name in PARSEABLE.streams.list(&tenant_id) { + if let Ok(stream) = PARSEABLE.get_stream(&stream_name, &tenant_id) + && stream.parquet_files().is_empty() + && stream.schema_files().is_empty() + { + continue; + } let object_store = object_store.clone(); let id = tenant_id.clone(); let span = info_span!("stream_upload", stream_name = %stream_name); diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index f14890aeb..f41d95017 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -63,6 +63,7 @@ pub enum JsonFlattenError { // Recursively flattens JSON objects and arrays, e.g. with the separator `.`, starting from the TOP // `{"key": "value", "nested_key": {"key":"value"}}` becomes `{"key": "value", "nested_key.key": "value"}` +#[hotpath::measure] pub fn flatten( nested_value: &mut Value, separator: &str, diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 1027baec6..e1f2596ab 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -34,6 +34,7 @@ pub mod strict; /// calls the function `flatten_json` which results Vec or Error /// in case when Vec is returned, converts the Vec to Value of Array /// this is to ensure recursive flattening does not happen for heavily nested jsons +#[hotpath::measure] pub fn flatten_json_body( body: Value, time_partition: Option<&String>, @@ -194,6 +195,7 @@ fn process_partitioned_non_array( } /// Processes data when no partitioning is configured (original logic) +#[hotpath::measure] fn process_non_partitioned( body: Value, time_partition: Option<&String>, @@ -217,6 +219,7 @@ fn process_non_partitioned( Ok(vec![data]) } +#[hotpath::measure] pub fn convert_array_to_object( body: Value, time_partition: Option<&String>, From c6f13557b99ad7a1ab5ee7ad0eb15c4f7e643191 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Mon, 1 Jun 2026 23:22:52 +0530 Subject: [PATCH 3/5] perf: improve otel metrics ingest path Batch staging arrow writes per output file and prepare OTEL metric parquet row groups off-thread before sequential writes. This reduces request-path writer contention and hides row-group concat/sort work behind merge/write. Reduce JSON allocation churn in OTEL metric flattening and generic flattening by reusing owned maps, pre-sizing containers, inserting attributes/exemplars directly, and avoiding per-row known-field set construction for series hashing. Also guard shutdown local sync against concurrent local sync cycles and avoid panicking on transient arrow-file metadata races. --- src/handlers/http/health_check.rs | 16 +- src/otel/logs.rs | 2 +- src/otel/metrics.rs | 271 +++++++++++++----------------- src/otel/otel_utils.rs | 64 +++---- src/parseable/staging/writer.rs | 92 +++++++--- src/parseable/streams.rs | 122 ++++++++++---- src/sync.rs | 35 ++++ src/utils/json/flatten.rs | 13 +- 8 files changed, 361 insertions(+), 254 deletions(-) diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index 1cd90f447..3c567f655 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -32,6 +32,7 @@ use once_cell::sync::Lazy; use tokio::{sync::Mutex, task::JoinSet}; use tracing::{error, info}; +use crate::sync::shutdown_local_sync_flush_and_convert; use crate::utils::get_tenant_id_from_request; use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams}; @@ -84,20 +85,7 @@ async fn perform_sync_operations() { } async fn perform_local_sync() { - let mut local_sync_joinset = JoinSet::new(); - - // Sync staging - PARSEABLE - .streams - .flush_and_convert(&mut local_sync_joinset, false, true); - - while let Some(res) = local_sync_joinset.join_next().await { - match res { - Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"), - Err(err) => error!("Failed to join async task: {err}"), - } - } + shutdown_local_sync_flush_and_convert().await; } async fn perform_object_store_sync() { diff --git a/src/otel/logs.rs b/src/otel/logs.rs index f3bf7c430..f6d770fad 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -138,7 +138,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { if log_record.body.is_some() { let body = &log_record.body; - let body_json = collect_json_from_values(body, &"body".to_string()); + let body_json = collect_json_from_values(body, "body"); for (key, value) in &body_json { // Always insert the original body field as is log_record_json.insert(key.clone(), value.clone()); diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 09ed31f3f..484ffbfb3 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -15,6 +15,7 @@ * along with this program. If not, see . * */ +use once_cell::sync::Lazy; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue; use opentelemetry_proto::tonic::metrics::v1::{ @@ -24,6 +25,8 @@ use opentelemetry_proto::tonic::metrics::v1::{ use serde_json::{Map, Value}; use rustc_hash::FxHasher; +use std::borrow::Cow; +use std::collections::HashSet; use std::hash::Hasher; use tracing::info_span; @@ -81,6 +84,9 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [ "__series_hash", ]; +static OTEL_METRICS_KNOWN_FIELDS: Lazy> = + Lazy::new(|| OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect()); + /// Compute a stable u64 identifier for the physical series a sample /// belongs to. Hashes `metric_name` plus every attribute key/value pair /// that survived OTel flattening — everything in the flattened data @@ -91,19 +97,17 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 37] = [ /// non-cryptographic) and feeds keys in sorted order so the hash /// doesn't depend on JSON Map iteration order. fn compute_series_hash(dp: &Map) -> u64 { - let known: std::collections::HashSet<&str> = - OTEL_METRICS_KNOWN_FIELD_LIST.iter().copied().collect(); - let mut label_pairs: Vec<(&str, String)> = dp - .iter() - .filter(|(k, _)| !known.contains(k.as_str())) - .map(|(k, v)| { - let v_str = match v { - Value::String(s) => s.clone(), - other => other.to_string(), - }; - (k.as_str(), v_str) - }) - .collect(); + let mut label_pairs: Vec<(&str, Cow<'_, str>)> = Vec::with_capacity(dp.len()); + for (key, value) in dp { + if OTEL_METRICS_KNOWN_FIELDS.contains(key.as_str()) { + continue; + } + let value = match value { + Value::String(s) => Cow::Borrowed(s.as_str()), + other => Cow::Owned(other.to_string()), + }; + label_pairs.push((key.as_str(), value)); + } label_pairs.sort_by(|a, b| a.0.cmp(b.0)); let mut hasher = FxHasher::default(); @@ -122,51 +126,42 @@ fn compute_series_hash(dp: &Map) -> u64 { hasher.finish() } -/// otel metrics event has json array for exemplar -/// this function flatten the exemplar json array -/// and returns a `Map` of the exemplar json -/// this function is reused in all json objects that have exemplar -fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { - exemplars - .iter() - .map(|exemplar| { - let mut exemplar_json = Map::new(); - insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); - exemplar_json.insert( - "exemplar_time_unix_nano".to_string(), - Value::String(convert_epoch_nano_to_timestamp( - exemplar.time_unix_nano as i64, - )), - ); - exemplar_json.insert( - "exemplar_span_id".to_string(), - Value::String(hex::encode(&exemplar.span_id)), - ); - exemplar_json.insert( - "exemplar_trace_id".to_string(), - Value::String(hex::encode(&exemplar.trace_id)), - ); - if let Some(value) = &exemplar.value { - match value { - ExemplarValue::AsDouble(double_val) => { - exemplar_json.insert( - "exemplar_value".to_string(), - serde_json::Number::from_f64(*double_val) - .map(Value::Number) - .unwrap_or(Value::Null), - ); - } - ExemplarValue::AsInt(int_val) => { - exemplar_json.insert( - "exemplar_value".to_string(), - Value::Number(serde_json::Number::from(*int_val)), - ); - } +fn insert_exemplars(map: &mut Map, exemplars: &[Exemplar]) { + for exemplar in exemplars { + insert_attributes(map, &exemplar.filtered_attributes); + map.insert( + "exemplar_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + exemplar.time_unix_nano as i64, + )), + ); + map.insert( + "exemplar_span_id".to_string(), + Value::String(hex::encode(&exemplar.span_id)), + ); + map.insert( + "exemplar_trace_id".to_string(), + Value::String(hex::encode(&exemplar.trace_id)), + ); + if let Some(value) = &exemplar.value { + match value { + ExemplarValue::AsDouble(double_val) => { + map.insert( + "exemplar_value".to_string(), + serde_json::Number::from_f64(*double_val) + .map(Value::Number) + .unwrap_or(Value::Null), + ); + } + ExemplarValue::AsInt(int_val) => { + map.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); } } - exemplar_json - }) - .collect() + } + } } /// otel metrics event has json array for number data points @@ -177,7 +172,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec { @@ -239,11 +227,8 @@ fn flatten_gauge(gauge: &Gauge) -> Vec> { /// and returns a `Vec` of `Map` for each data point fn flatten_sum(sum: &Sum) -> Vec> { let mut data_points = flatten_number_data_points(&sum.data_points); - let agg_temp = flatten_aggregation_temporality(sum.aggregation_temporality); for dp in &mut data_points { - for (k, v) in &agg_temp { - dp.insert(k.clone(), v.clone()); - } + insert_aggregation_temporality(dp, sum.aggregation_temporality); dp.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); } data_points @@ -254,9 +239,9 @@ fn flatten_sum(sum: &Sum) -> Vec> { /// this function flatten the histogram json object /// and returns a `Vec` of `Map` for each data point fn flatten_histogram(histogram: &Histogram) -> Vec> { - let mut data_points_json = Vec::new(); + let mut data_points_json = Vec::with_capacity(histogram.data_points.len()); for data_point in &histogram.data_points { - let mut data_point_json = Map::new(); + let mut data_point_json = Map::with_capacity(data_point.attributes.len() + 10); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -301,29 +286,14 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let exemplar_json = flatten_exemplar(&data_point.exemplars); - if !exemplar_json.is_empty() { - for exemplar in exemplar_json { - for (key, value) in exemplar { - data_point_json.insert(key, value); - } - } - } + insert_exemplars(&mut data_point_json, &data_point.exemplars); - data_point_json.extend(flatten_data_point_flags(data_point.flags)); + insert_data_point_flags(&mut data_point_json, data_point.flags); insert_number_if_some(&mut data_point_json, "min", &data_point.min); insert_number_if_some(&mut data_point_json, "max", &data_point.max); + insert_aggregation_temporality(&mut data_point_json, histogram.aggregation_temporality); data_points_json.push(data_point_json); } - let mut histogram_json = Map::new(); - histogram_json.extend(flatten_aggregation_temporality( - histogram.aggregation_temporality, - )); - for data_point_json in &mut data_points_json { - for (key, value) in &histogram_json { - data_point_json.insert(key.clone(), value.clone()); - } - } data_points_json } @@ -331,7 +301,7 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { /// this function flatten the buckets json object /// and returns a `Map` of the flattened json fn flatten_buckets(bucket: &Buckets) -> Map { - let mut bucket_json = Map::new(); + let mut bucket_json = Map::with_capacity(2); bucket_json.insert("offset".to_string(), Value::Number(bucket.offset.into())); bucket_json.insert( "bucket_count".to_string(), @@ -351,9 +321,9 @@ fn flatten_buckets(bucket: &Buckets) -> Map { /// this function flatten the exponential histogram json object /// and returns a `Vec` of `Map` for each data point fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { - let mut data_points_json = Vec::new(); + let mut data_points_json = Vec::with_capacity(exp_histogram.data_points.len()); for data_point in &exp_histogram.data_points { - let mut data_point_json = Map::new(); + let mut data_point_json = Map::with_capacity(data_point.attributes.len() + 12); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -392,26 +362,11 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { - let mut data_points_json = Vec::new(); + let mut data_points_json = Vec::with_capacity(summary.data_points.len()); for data_point in &summary.data_points { - let mut data_point_json = Map::new(); + let mut data_point_json = Map::with_capacity(data_point.attributes.len() + 6); insert_attributes(&mut data_point_json, &data_point.attributes); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -453,25 +408,20 @@ fn flatten_summary(summary: &Summary) -> Vec> { .quantile_values .iter() .map(|quantile_value| { - Value::Object( - vec![ - ( - "quantile", - serde_json::Number::from_f64(quantile_value.quantile) - .map(Value::Number) - .unwrap_or(Value::Null), - ), - ( - "value", - serde_json::Number::from_f64(quantile_value.value) - .map(Value::Number) - .unwrap_or(Value::Null), - ), - ] - .into_iter() - .map(|(k, v)| (k.to_string(), v)) - .collect(), - ) + let mut quantile_map = Map::with_capacity(2); + quantile_map.insert( + "quantile".to_string(), + serde_json::Number::from_f64(quantile_value.quantile) + .map(Value::Number) + .unwrap_or(Value::Null), + ); + quantile_map.insert( + "value".to_string(), + serde_json::Number::from_f64(quantile_value.value) + .map(Value::Number) + .unwrap_or(Value::Null), + ); + Value::Object(quantile_map) }) .collect(), ), @@ -505,29 +455,25 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec let metric_desc = Value::String(metrics_record.description.clone()); let metric_unit = Value::String(metrics_record.unit.clone()); let metric_type_val = Value::String(metric_type.to_string()); - let mut metadata = Map::new(); + let mut metadata = Map::with_capacity(metrics_record.metadata.len()); insert_attributes(&mut metadata, &metrics_record.metadata); if data_points.is_empty() { - let mut single = Map::new(); + let mut single = Map::with_capacity(metadata.len() + 8); single.insert("metric_name".to_string(), metric_name); single.insert("metric_description".to_string(), metric_desc); single.insert("metric_unit".to_string(), metric_unit); single.insert("metric_type".to_string(), metric_type_val); match &metrics_record.data { Some(metric::Data::Sum(sum)) => { - single.extend(flatten_aggregation_temporality(sum.aggregation_temporality)); + insert_aggregation_temporality(&mut single, sum.aggregation_temporality); single.insert("is_monotonic".to_string(), Value::Bool(sum.is_monotonic)); } Some(metric::Data::Histogram(histogram)) => { - single.extend(flatten_aggregation_temporality( - histogram.aggregation_temporality, - )); + insert_aggregation_temporality(&mut single, histogram.aggregation_temporality); } Some(metric::Data::ExponentialHistogram(exp_histogram)) => { - single.extend(flatten_aggregation_temporality( - exp_histogram.aggregation_temporality, - )); + insert_aggregation_temporality(&mut single, exp_histogram.aggregation_temporality); } _ => {} } @@ -549,6 +495,17 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec data_points } +fn metric_data_point_count(metric: &Metric) -> usize { + match &metric.data { + Some(metric::Data::Gauge(gauge)) => gauge.data_points.len(), + Some(metric::Data::Sum(sum)) => sum.data_points.len(), + Some(metric::Data::Histogram(histogram)) => histogram.data_points.len(), + Some(metric::Data::ExponentialHistogram(exp_histogram)) => exp_histogram.data_points.len(), + Some(metric::Data::Summary(summary)) => summary.data_points.len(), + None => 0, + } +} + /// Common function to process resource metrics and merge resource-level fields #[allow(clippy::too_many_arguments)] fn process_resource_metrics( @@ -571,7 +528,12 @@ fn process_resource_metrics( for resource_metric in resource_metrics { // Build resource-level fields once per resource - let mut resource_fields = Map::new(); + let mut resource_fields = Map::with_capacity( + get_resource(resource_metric) + .map(|resource| resource.attributes.len()) + .unwrap_or_default() + + 2, + ); if let Some(resource) = get_resource(resource_metric) { insert_attributes(&mut resource_fields, &resource.attributes); resource_fields.insert( @@ -606,6 +568,12 @@ fn process_resource_metrics( ); let metrics = get_metrics(scope_metric); + vec_otel_json.reserve( + metrics + .iter() + .map(|metric| metric_data_point_count(get_metric(metric)).max(1)) + .sum::(), + ); let date = chrono::Utc::now().date_naive().to_string(); increment_metrics_collected_by_date(metrics.len() as u64, &date, tenant_id); @@ -683,13 +651,8 @@ pub fn flatten_otel_metrics_protobuf( result } -/// otel metrics event has json object for aggregation temporality -/// there is a mapping of aggregation temporality to its description provided in proto -/// this function fetches the description from the aggregation temporality -/// and adds it to the flattened json -fn flatten_aggregation_temporality(aggregation_temporality: i32) -> Map { - let mut aggregation_temporality_json = Map::new(); - aggregation_temporality_json.insert( +fn insert_aggregation_temporality(map: &mut Map, aggregation_temporality: i32) { + map.insert( "aggregation_temporality".to_string(), Value::Number(aggregation_temporality.into()), ); @@ -699,27 +662,23 @@ fn flatten_aggregation_temporality(aggregation_temporality: i32) -> Map "CUMULATIVE", _ => "", }; - aggregation_temporality_json.insert( + map.insert( "aggregation_temporality_description".to_string(), Value::String(description.to_string()), ); - - aggregation_temporality_json } -fn flatten_data_point_flags(flags: u32) -> Map { - let mut data_point_flags_json = Map::new(); - data_point_flags_json.insert("data_point_flags".to_string(), Value::Number(flags.into())); +fn insert_data_point_flags(map: &mut Map, flags: u32) { + map.insert("data_point_flags".to_string(), Value::Number(flags.into())); let description = match flags { 0 => "DO_NOT_USE", 1 => "NO_RECORDED_VALUE_MASK", _ => "", }; - data_point_flags_json.insert( + map.insert( "data_point_flags_description".to_string(), Value::String(description.to_string()), ); - data_point_flags_json } #[cfg(test)] diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 88cb788d8..6caec1f21 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -23,21 +23,26 @@ use opentelemetry_proto::tonic::common::v1::{ use serde_json::{Map, Value}; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte -pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { +pub fn collect_json_from_value(key: &str, value: OtelValue) -> Map { let mut value_json: Map = Map::new(); + insert_json_from_value(&mut value_json, key, value); + value_json +} + +fn insert_json_from_value(map: &mut Map, key: &str, value: OtelValue) { match value { OtelValue::StringValue(str_val) => { - value_json.insert(key.to_string(), Value::String(str_val)); + map.insert(key.to_string(), Value::String(str_val)); } OtelValue::BoolValue(bool_val) => { - value_json.insert(key.to_string(), Value::Bool(bool_val)); + map.insert(key.to_string(), Value::Bool(bool_val)); } OtelValue::IntValue(int_val) => { - value_json.insert(key.to_string(), Value::String(int_val.to_string())); + map.insert(key.to_string(), Value::String(int_val.to_string())); } OtelValue::DoubleValue(double_val) => { if let Some(number) = serde_json::Number::from_f64(double_val) { - value_json.insert(key.to_string(), Value::Number(number)); + map.insert(key.to_string(), Value::Number(number)); } } OtelValue::ArrayValue(array_val) => { @@ -51,17 +56,15 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { // Create a JSON object to store the key-value list let kv_object = collect_json_from_key_value_list(kv_list_val); - for (key, value) in kv_object.iter() { - value_json.insert(key.clone(), value.clone()); - } + map.extend(kv_object); } OtelValue::BytesValue(bytes_val) => { - value_json.insert( + map.insert( key.to_string(), Value::String(String::from_utf8_lossy(&bytes_val).to_string()), ); @@ -72,15 +75,13 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map Value { - let mut json_array = Vec::new(); + let mut json_array = Vec::with_capacity(array_value.values.len()); for value in &array_value.values { if let Some(val) = &value.value { match val { @@ -122,12 +123,11 @@ fn collect_json_from_array_value(array_value: &ArrayValue) -> Value { /// The function iterates through the key-value pairs in the list /// and collects their JSON representations into a single Map fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map { - let mut kv_list_json: Map = Map::new(); + let mut kv_list_json: Map = Map::with_capacity(key_value_list.values.len()); for key_value in key_value_list.values { if let Some(val) = key_value.value { if let Some(val) = val.value { - let json_value = collect_json_from_value(&key_value.key, val); - kv_list_json.extend(json_value); + insert_json_from_value(&mut kv_list_json, &key_value.key, val); } else { tracing::warn!("Key '{}' has no value in key-value list", key_value.key); } @@ -136,23 +136,25 @@ fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map Map { +pub fn collect_json_from_anyvalue(key: &str, value: AnyValue) -> Map { collect_json_from_value(key, value.value.unwrap()) } //traverse through Value by calling function ollect_json_from_any_value -pub fn collect_json_from_values(values: &Option, key: &String) -> Map { - let mut value_json: Map = Map::new(); +pub fn collect_json_from_values(values: &Option, key: &str) -> Map { + let mut value_json: Map = Map::with_capacity(1); for value in values.iter() { - value_json = collect_json_from_anyvalue(key, value.clone()); + if let Some(value) = value.value.clone() { + insert_json_from_value(&mut value_json, key, value); + } } value_json } pub fn value_to_string(value: serde_json::Value) -> String { - match value.clone() { + match value { e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), Value::String(s) => s, _ => "".to_string(), @@ -160,13 +162,12 @@ pub fn value_to_string(value: serde_json::Value) -> String { } pub fn flatten_attributes(attributes: &[KeyValue]) -> Map { - let mut attributes_json: Map = Map::new(); + let mut attributes_json: Map = Map::with_capacity(attributes.len()); for attribute in attributes { - let key = &attribute.key; - let value = &attribute.value; - let value_json = collect_json_from_values(value, &key.to_string()); - for (attr_key, attr_val) in &value_json { - attributes_json.insert(attr_key.clone(), attr_val.clone()); + if let Some(value) = &attribute.value + && let Some(value) = value.value.clone() + { + insert_json_from_value(&mut attributes_json, &attribute.key, value); } } attributes_json @@ -193,9 +194,12 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } pub fn insert_attributes(map: &mut Map, attributes: &[KeyValue]) { - let attributes_json = flatten_attributes(attributes); - for (key, value) in attributes_json { - map.insert(key, value); + for attribute in attributes { + if let Some(value) = &attribute.value + && let Some(value) = value.value.clone() + { + insert_json_from_value(map, &attribute.key, value); + } } } diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index ba3a8fb9f..351bbd1cc 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -41,7 +41,7 @@ use crate::{ use super::StagingError; -const DISK_WRITE_BATCH_ROWS: usize = 16_384; +const DISK_WRITE_BATCH_ROWS: usize = 32_768; #[derive(Default)] pub struct Writer { @@ -83,36 +83,82 @@ impl Writer { return Ok(()); } - let schema = pending.batches[0].schema(); - let batch = concat_batches(&schema, pending.batches.iter())?; - match self.disk.get_mut(filename) { - Some(writer) => writer.write(&batch)?, - None => { - let range = pending.range.expect("pending disk batch must have range"); - if let Some(parent) = file_path.parent() { - fs::create_dir_all(parent)?; - } - let mut writer = DiskWriter::try_new(file_path, &schema, range)?; - writer.write(&batch)?; - self.disk.insert(filename.to_owned(), writer); - } - } + write_pending_disk_batch(&mut self.disk, filename.to_owned(), pending, file_path)?; Ok(()) } - pub fn flush_all_pending_disk( + pub fn take_flushable_disk( &mut self, + forced: bool, + ) -> (HashMap, PendingDiskWrites) { + let mut flushable_disk = HashMap::new(); + let old_disk = std::mem::take(&mut self.disk); + for (filename, writer) in old_disk { + if !forced && writer.is_current() { + self.disk.insert(filename, writer); + } else { + flushable_disk.insert(filename, writer); + } + } + + let mut flushable_pending = HashMap::new(); + let old_pending = std::mem::take(&mut self.disk_pending); + for (filename, pending) in old_pending { + if !forced && pending.is_current() { + self.disk_pending.insert(filename, pending); + } else { + flushable_pending.insert(filename, pending); + } + } + + (flushable_disk, PendingDiskWrites(flushable_pending)) + } +} + +pub struct PendingDiskWrites(HashMap); + +impl PendingDiskWrites { + pub fn flush_into( + self, + disk: &mut HashMap, data_path: &std::path::Path, ) -> Result<(), StagingError> { - let filenames = self.disk_pending.keys().cloned().collect_vec(); - for filename in filenames { - self.flush_pending_disk(&filename, data_path.join(&filename))?; + for (filename, pending) in self.0 { + write_pending_disk_batch(disk, filename.clone(), pending, data_path.join(filename))?; } Ok(()) } } +fn write_pending_disk_batch( + disk: &mut HashMap, + filename: String, + pending: PendingDiskBatch, + file_path: PathBuf, +) -> Result<(), StagingError> { + if pending.batches.is_empty() { + return Ok(()); + } + + let schema = pending.batches[0].schema(); + let batch = concat_batches(&schema, pending.batches.iter())?; + match disk.get_mut(&filename) { + Some(writer) => writer.write(&batch)?, + None => { + let range = pending.range.expect("pending disk batch must have range"); + if let Some(parent) = file_path.parent() { + fs::create_dir_all(parent)?; + } + let mut writer = DiskWriter::try_new(file_path, &schema, range)?; + writer.write(&batch)?; + disk.insert(filename, writer); + } + } + + Ok(()) +} + #[derive(Default)] struct PendingDiskBatch { rows: usize, @@ -120,6 +166,14 @@ struct PendingDiskBatch { range: Option, } +impl PendingDiskBatch { + fn is_current(&self) -> bool { + self.range + .as_ref() + .is_some_and(|range| range.contains(Utc::now())) + } +} + pub struct DiskWriter { inner: StreamWriter>, path: PathBuf, diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 5c7c75984..3ef01def8 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -35,6 +35,7 @@ use parquet::{ use relative_path::RelativePathBuf; use std::sync::PoisonError; use std::{ + collections::VecDeque, collections::{HashMap, HashSet}, fs::{self, File, OpenOptions, remove_file, write}, num::NonZeroU32, @@ -69,6 +70,11 @@ use super::{ }; const INPROCESS_DIR_PREFIX: &str = "processing_"; +const METRIC_ROW_GROUP_PREP_IN_FLIGHT: usize = 1; + +struct PreparedMetricRowGroup { + batch: RecordBatch, +} /// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet( @@ -352,14 +358,13 @@ impl Stream { let mut arrow_files = self.arrow_files(); if !shutdown_signal { arrow_files.retain(|path| { - let creation = path - .metadata() + path.metadata() .ok() .and_then(|meta| meta.created().or_else(|_| meta.modified()).ok()) - .expect("Arrow file should have a valid creation or modified time"); - - // Compare if creation time is actually from previous minute - minute_from_system_time(creation) < minute_from_system_time(exclude) + .is_some_and(|creation| { + // Compare if creation time is actually from previous minute + minute_from_system_time(creation) < minute_from_system_time(exclude) + }) }); } arrow_files @@ -551,27 +556,17 @@ impl Stream { // Swap out stale writers under the lock, drop them after releasing it. // DiskWriter::Drop does I/O (IPC finish + file rename) so dropping // outside the lock avoids blocking concurrent push() calls. - let stale_writers = { + let (mut stale_writers, pending_writes) = { let mut writer = self.writer.lock().map_err(|poisoned| { StagingError::PoisonError(PoisonError::new(format!( "Writer lock poisoned while flushing data for stream {} - {}", self.stream_name, poisoned ))) })?; - writer.flush_all_pending_disk(&self.data_path)?; writer.mem.clear(); - - let mut old_disk = HashMap::new(); - std::mem::swap(&mut writer.disk, &mut old_disk); - if !forced { - for (k, v) in old_disk.drain() { - if v.is_current() { - writer.disk.insert(k, v); - } - } - } - old_disk + writer.take_flushable_disk(forced) }; + pending_writes.flush_into(&mut stale_writers, &self.data_path)?; // DiskWriter::Drop I/O happens here, outside the lock drop(stale_writers); Ok(()) @@ -699,6 +694,44 @@ impl Stream { Ok(RecordBatch::try_new(schema, columns)?) } + #[hotpath::measure] + fn prepare_metric_row_group( + schema: Arc, + buffer: Vec, + time_partition_field: String, + ) -> Result { + let combined = arrow::compute::concat_batches(&schema, &buffer)?; + let batch = Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?; + + Ok(PreparedMetricRowGroup { batch }) + } + + fn spawn_metric_row_group_prepare( + schema: Arc, + buffer: Vec, + time_partition_field: String, + ) -> std::sync::mpsc::Receiver> { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + rayon::spawn(move || { + let _ = tx.send(Self::prepare_metric_row_group( + schema, + buffer, + time_partition_field, + )); + }); + rx + } + + fn receive_prepared_metric_row_group( + rx: std::sync::mpsc::Receiver>, + ) -> Result { + rx.recv().map_err(|err| { + StagingError::ObjectStorage(std::io::Error::other(format!( + "Metric row-group preparation worker failed: {err}" + ))) + })? + } + fn reset_staging_metrics(&self, tenant_id: &Option) { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); metrics::STAGING_FILES @@ -837,32 +870,63 @@ impl Stream { // per-page (metric_name min, max) stats narrow to the slice // each page actually carries. let target = self.options.row_group_size; - let mut buffer: Vec = Vec::with_capacity(record_reader.readers.len()); + let buffer_capacity = record_reader.readers.len(); + let mut pending_row_groups = VecDeque::with_capacity(METRIC_ROW_GROUP_PREP_IN_FLIGHT); + let mut buffer: Vec = Vec::with_capacity(buffer_capacity); let mut buffered_rows: usize = 0; - for record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { + let mut merged_iter = + record_reader.merged_iter(schema.clone(), time_partition.cloned()); + loop { + let Some(record) = merged_iter.next() else { + break; + }; let record_rows = record.num_rows(); buffered_rows += record_rows; buffer.push(record); if buffered_rows >= target { - let combined = arrow::compute::concat_batches(schema, &buffer)?; - let sorted = - Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?; - writer.write(&sorted)?; + let row_group_buffer = + std::mem::replace(&mut buffer, Vec::with_capacity(buffer_capacity)); + let next_row_group = Self::spawn_metric_row_group_prepare( + schema.clone(), + row_group_buffer, + time_partition_field.clone(), + ); + pending_row_groups.push_back(next_row_group); + if pending_row_groups.len() > METRIC_ROW_GROUP_PREP_IN_FLIGHT + && let Some(rx) = pending_row_groups.pop_front() + { + let prepared = Self::receive_prepared_metric_row_group(rx)?; + writer.write(&prepared.batch)?; + } buffer.clear(); buffered_rows = 0; } } if !buffer.is_empty() { - let combined = arrow::compute::concat_batches(schema, &buffer)?; - let sorted = Self::sort_batch_for_metric_pruning(&combined, &time_partition_field)?; - writer.write(&sorted)?; + let next_row_group = Self::spawn_metric_row_group_prepare( + schema.clone(), + buffer, + time_partition_field.clone(), + ); + pending_row_groups.push_back(next_row_group); + if pending_row_groups.len() > METRIC_ROW_GROUP_PREP_IN_FLIGHT + && let Some(rx) = pending_row_groups.pop_front() + { + let prepared = Self::receive_prepared_metric_row_group(rx)?; + writer.write(&prepared.batch)?; + } + } + while let Some(rx) = pending_row_groups.pop_front() { + let prepared = Self::receive_prepared_metric_row_group(rx)?; + writer.write(&prepared.batch)?; } + writer.close()?; } else { for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { writer.write(record)?; } + writer.close()?; } - writer.close()?; if !Self::is_valid_parquet_file(part_path, &self.stream_name) { error!( diff --git a/src/sync.rs b/src/sync.rs index 14a88ab38..ebdc1d057 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -47,6 +47,41 @@ impl Drop for SyncRunningGuard { } } +async fn wait_for_local_sync_guard() -> SyncRunningGuard { + let mut warned = false; + loop { + if LOCAL_SYNC_RUNNING + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return SyncRunningGuard(&LOCAL_SYNC_RUNNING); + } + + if !warned { + warn!("Waiting for existing local_sync cycle before shutdown sync"); + warned = true; + } + sleep(Duration::from_millis(250)).await; + } +} + +pub async fn shutdown_local_sync_flush_and_convert() { + let _guard = wait_for_local_sync_guard().await; + let mut local_sync_joinset = JoinSet::new(); + + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, false, true); + + while let Some(res) = local_sync_joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Failed to join async task: {err}"), + } + } +} + use crate::alerts::alert_enums::AlertTask; use crate::alerts::alerts_utils; use crate::parseable::PARSEABLE; diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index f41d95017..ace3a4a94 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -78,8 +78,9 @@ pub fn flatten( validate_time_partition(nested_dict, time_partition, time_partition_limit)?; validate_custom_partition(nested_dict, custom_partition)?; } - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; + let original = std::mem::take(nested_dict); + let mut map = Map::with_capacity(original.len()); + flatten_object(&mut map, None, original, separator)?; *nested_dict = map; } Value::Array(arr) => { @@ -222,24 +223,25 @@ pub fn validate_time_partition( fn flatten_object( output_map: &mut Map, parent_key: Option<&str>, - nested_map: &mut Map, + nested_map: Map, separator: &str, ) -> Result<(), JsonFlattenError> { for (key, mut value) in nested_map { let new_key = match parent_key { Some(parent) => format!("{parent}{separator}{key}"), - None => key.to_string(), + None => key, }; match &mut value { Value::Object(obj) => { + let obj = std::mem::take(obj); flatten_object(output_map, Some(&new_key), obj, separator)?; } Value::Array(arr) if arr.iter().any(Value::is_object) => { flatten_array_objects(output_map, &new_key, arr, separator)?; } _ => { - output_map.insert(new_key, std::mem::take(value)); + output_map.insert(new_key, value); } } } @@ -259,6 +261,7 @@ pub fn flatten_array_objects( match value { Value::Object(nested_object) => { let mut output_map = Map::new(); + let nested_object = std::mem::take(nested_object); flatten_object(&mut output_map, Some(parent_key), nested_object, separator)?; for (key, value) in output_map { let column = columns From abf7432168b29adc8bc415ad7e08ec2a56e574c8 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 2 Jun 2026 12:44:16 +0530 Subject: [PATCH 4/5] added hotpath as a feature flag --- Cargo.toml | 3 ++- src/event/format/json.rs | 2 +- src/event/format/mod.rs | 2 +- src/event/mod.rs | 2 +- src/handlers/http/modal/utils/ingest_utils.rs | 2 +- src/lib.rs | 4 +++- src/otel/metrics.rs | 2 +- src/parseable/staging/writer.rs | 4 ++-- src/parseable/streams.rs | 22 +++++++++---------- src/utils/json/flatten.rs | 2 +- src/utils/json/mod.rs | 6 ++--- 11 files changed, 27 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2a389776b..82fa5ef86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ tokio-stream = { version = "0.1.17", features = ["fs"] } tokio-util = { version = "0.7" } # perf -hotpath = { version = "0.16.0", features = [ +hotpath = { version = "0.16.0", optional = true, features = [ "hotpath", "hotpath-cpu", "hotpath-alloc", @@ -219,6 +219,7 @@ assets-sha1 = "a7523ef819d38678275ae165c443564b2f9a3fc1" [features] debug = [] +hotpath = ["dep:hotpath"] kafka = [ "rdkafka", "rdkafka/ssl-vendored", diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 90a560022..02c9a51aa 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -60,7 +60,7 @@ impl EventFormat for Event { // convert the incoming json to a vector of json values // also extract the arrow schema, tags and metadata from the incoming json - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn to_data( self, schema: &HashMap>, diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e3457016d..ae015a8e9 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -162,7 +162,7 @@ pub trait EventFormat: Sized { /// Returns the UTC time at ingestion fn get_p_timestamp(&self) -> DateTime; - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn into_recordbatch( self, storage_schema: &HashMap>, diff --git a/src/event/mod.rs b/src/event/mod.rs index 8f0dfebff..4820e854c 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -72,7 +72,7 @@ impl Event { is_first_event = self.is_first_event ) )] - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn process(self) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3f3eeed15..0ecdb0486 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -156,7 +156,7 @@ pub async fn flatten_and_push_logs( skip(json, log_source, p_custom_fields, time_partition, telemetry_type, tenant_id), fields(stream_name, record_count = tracing::field::Empty) )] -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn push_logs( stream_name: &str, json: Value, diff --git a/src/lib.rs b/src/lib.rs index 036cba99c..0e205cc0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,12 +62,14 @@ pub use datafusion; pub use handlers::http::modal::{ ParseableServer, ingest_server::IngestServer, query_server::QueryServer, server::Server, }; +#[cfg(feature = "hotpath")] +pub use hotpath; use once_cell::sync::Lazy; pub use openid; use parseable::PARSEABLE; use reqwest::{Client, ClientBuilder}; -pub use {hotpath, tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; pub use {opentelemetry, opentelemetry_otlp, opentelemetry_proto, opentelemetry_sdk}; +pub use {tracing_actix_web, tracing_opentelemetry, tracing_subscriber}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 484ffbfb3..7d6a44d63 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -608,7 +608,7 @@ fn process_resource_metrics( /// this function performs the custom flattening of the otel metrics /// and returns a `Vec` of `Value::Object` of the flattened json -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn flatten_otel_metrics(message: MetricsData, tenant_id: &str) -> Vec { process_resource_metrics( &message.resource_metrics, diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 351bbd1cc..a3fe96096 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -51,7 +51,7 @@ pub struct Writer { } impl Writer { - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn push_disk( &mut self, filename: String, @@ -204,7 +204,7 @@ impl DiskWriter { } /// Write a single recordbatch into file - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { self.inner.write(rb).map_err(StagingError::Arrow) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 3ef01def8..cfb3f6b45 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -139,7 +139,7 @@ impl Stream { } // Concatenates record batches and puts them in memory store for each event. - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn push( &self, schema_key: &str, @@ -409,7 +409,7 @@ impl Stream { base.join(format!("{INPROCESS_DIR_PREFIX}{minute}")) } - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn parquet_files(&self) -> Vec { let Ok(dir) = self.data_path.read_dir() else { return vec![]; @@ -470,7 +470,7 @@ impl Stream { skip(self, tenant_id), fields(stream_name = %self.stream_name) )] - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn prepare_parquet( &self, init_signal: bool, @@ -550,7 +550,7 @@ impl Stream { Ok(()) } - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn flush(&self, forced: bool) -> Result<(), StagingError> { let _span = info_span!("flush", stream_name = %self.stream_name, forced).entered(); // Swap out stale writers under the lock, drop them after releasing it. @@ -652,7 +652,7 @@ impl Stream { /// Bails out without sorting when either source column is missing /// (non-metric stream, schema drift) so the caller can write the /// batch unchanged. - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn sort_batch_for_metric_pruning( batch: &RecordBatch, time_partition_field: &str, @@ -694,7 +694,7 @@ impl Stream { Ok(RecordBatch::try_new(schema, columns)?) } - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn prepare_metric_row_group( schema: Arc, buffer: Vec, @@ -772,7 +772,7 @@ impl Stream { /// This function reads arrow files, groups their schemas /// /// converts them into parquet files and returns a merged schema - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn convert_disk_files_to_parquet( &self, time_partition: Option<&String>, @@ -836,7 +836,7 @@ impl Stream { Ok(Some(Schema::try_merge(schemas)?)) } - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn write_parquet_part_file( &self, part_path: &Path, @@ -941,7 +941,7 @@ impl Stream { } /// function to validate parquet files - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn is_valid_parquet_file(path: &Path, stream_name: &str) -> bool { // First check file size as a quick validation match path.metadata() { @@ -984,7 +984,7 @@ impl Stream { } } - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf], tenant_id: &Option) { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); for (i, file) in arrow_files.iter().enumerate() { @@ -1368,7 +1368,7 @@ impl Stream { skip(self, tenant_id), fields(stream_name = %self.stream_name) )] - #[hotpath::measure] + #[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn flush_and_convert( &self, init_signal: bool, diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index ace3a4a94..d3a462ff2 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -63,7 +63,7 @@ pub enum JsonFlattenError { // Recursively flattens JSON objects and arrays, e.g. with the separator `.`, starting from the TOP // `{"key": "value", "nested_key": {"key":"value"}}` becomes `{"key": "value", "nested_key.key": "value"}` -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn flatten( nested_value: &mut Value, separator: &str, diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index e1f2596ab..87405fe7c 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -34,7 +34,7 @@ pub mod strict; /// calls the function `flatten_json` which results Vec or Error /// in case when Vec is returned, converts the Vec to Value of Array /// this is to ensure recursive flattening does not happen for heavily nested jsons -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn flatten_json_body( body: Value, time_partition: Option<&String>, @@ -195,7 +195,7 @@ fn process_partitioned_non_array( } /// Processes data when no partitioning is configured (original logic) -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] fn process_non_partitioned( body: Value, time_partition: Option<&String>, @@ -219,7 +219,7 @@ fn process_non_partitioned( Ok(vec![data]) } -#[hotpath::measure] +#[cfg_attr(feature = "hotpath", hotpath::measure)] pub fn convert_array_to_object( body: Value, time_partition: Option<&String>, From 131b123a11002ea92f20697f54cc48d478047bfe Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 2 Jun 2026 14:58:06 +0530 Subject: [PATCH 5/5] fix: coderabbit suggestions --- Cargo.toml | 10 +++++++++- src/cli.rs | 8 ++++---- src/main.rs | 1 + src/storage/object_storage.rs | 4 ++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 82fa5ef86..344021102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -219,7 +219,15 @@ assets-sha1 = "a7523ef819d38678275ae165c443564b2f9a3fc1" [features] debug = [] -hotpath = ["dep:hotpath"] +hotpath = [ + "dep:hotpath", + "hotpath/hotpath", + "hotpath/hotpath-cpu", + "hotpath/hotpath-alloc", + "hotpath/tokio", +] +hotpath-alloc = ["hotpath"] +hotpath-cpu = ["hotpath"] kafka = [ "rdkafka", "rdkafka/ssl-vendored", diff --git a/src/cli.rs b/src/cli.rs index ef30ce68f..f380a02c4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -559,14 +559,14 @@ pub struct Options { )] pub max_field_statistics: usize, - // maximum limit to store the statistics for a field + // collect statistics for dataset fields #[arg( long, - env = "P_CALCULATE_FIELD_STATISTICS", + env = "P_COLLECT_DATASET_STATS", default_value = "true", - help = "Maximum number of field statistics to store" + help = "Collect statistics for dataset fields" )] - pub calculate_field_statistics: bool, + pub collect_dataset_stats: bool, #[arg( long, diff --git a/src/main.rs b/src/main.rs index 42cba34f5..4d07c1608 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry, fmt}; #[actix_web::main] +#[cfg_attr(feature = "hotpath", hotpath::main)] async fn main() -> anyhow::Result<()> { init_logger(); // Install the rustls crypto provider before any TLS operations. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4bc0b0dd6..a1831b5d8 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -184,8 +184,8 @@ async fn upload_single_parquet_file( let manifest = catalog::create_from_parquet_file(absolute_path, &path) .map_err(|e| (path.clone(), ObjectStorageError::from(e)))?; - if PARSEABLE.options.calculate_field_statistics { - // Calculate field stats if enabled + if PARSEABLE.options.collect_dataset_stats { + // collect field stats if enabled calculate_stats_if_enabled(&stream_name, &path, &schema, tenant_id).await; }