diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..e1f6bbd --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,4 @@ +* @DataDog/serverless-azure-gcp @DataDog/serverless-aws + +crates/datadog-serverless-compat/ @DataDog/serverless-azure-gcp +crates/datadog-trace-agent/ @DataDog/serverless-azure-gcp diff --git a/Cargo.lock b/Cargo.lock index dc618d7..5c82ea0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anyhow" version = "1.0.97" @@ -465,12 +471,14 @@ dependencies = [ "libdd-common", "libdd-trace-obfuscation", "libdd-trace-protobuf", + "libdd-trace-stats", "libdd-trace-utils", "rmp-serde", "serde", "serde_json", "serial_test", "tempfile", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -650,6 +658,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -839,6 +853,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashbrown" @@ -1275,7 +1294,7 @@ checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "libdd-common" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "cc", @@ -1305,10 +1324,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "libdd-ddsketch" +version = "1.0.0" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" +dependencies = [ + "prost", +] + [[package]] name = "libdd-tinybytes" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "serde", ] @@ -1316,7 +1343,7 @@ dependencies = [ [[package]] name = "libdd-trace-normalization" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "libdd-trace-protobuf", @@ -1325,7 +1352,7 @@ dependencies = [ [[package]] name = "libdd-trace-obfuscation" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "libdd-common", @@ -1342,17 +1369,28 @@ dependencies = [ [[package]] name = "libdd-trace-protobuf" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "prost", "serde", "serde_bytes", ] +[[package]] +name = "libdd-trace-stats" +version = "1.0.0" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" +dependencies = [ + "hashbrown 0.15.2", + "libdd-ddsketch", + "libdd-trace-protobuf", + "libdd-trace-utils", +] + [[package]] name = "libdd-trace-utils" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "bytes", diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c531df8..d9870e4 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -3,6 +3,7 @@ addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink , oyvindln " ahash,https://github.com/tkaitchuck/ahash,MIT OR Apache-2.0,Tom Kaitchuck aho-corasick,https://github.com/BurntSushi/aho-corasick,Unlicense OR MIT,Andrew Gallant +allocator-api2,https://github.com/zakarumych/allocator-api2,MIT OR Apache-2.0,Zakarum anyhow,https://github.com/dtolnay/anyhow,MIT OR Apache-2.0,David Tolnay assert-json-diff,https://github.com/davidpdrsn/assert-json-diff,MIT,David Pedersen async-lock,https://github.com/smol-rs/async-lock,Apache-2.0 OR MIT,Stjepan Glavina @@ -54,6 +55,7 @@ fixedbitset,https://github.com/petgraph/fixedbitset,MIT OR Apache-2.0,bluss flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float-cmp,https://github.com/mikedilger/float-cmp,MIT,Mike Dilger fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton +foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Authors futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors @@ -109,10 +111,12 @@ lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2. lazycell,https://github.com/indiv0/lazycell,MIT OR Apache-2.0,"Alex Crichton , Nikita Pekin " libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libdd-common,https://github.com/DataDog/libdatadog/tree/main/datadog-common,Apache-2.0,The libdd-common Authors +libdd-ddsketch,https://github.com/DataDog/libdatadog/tree/main/libdd-ddsketch,Apache-2.0,The libdd-ddsketch Authors libdd-tinybytes,https://github.com/DataDog/libdatadog/tree/main/libdd-tinybytes,Apache-2.0,The libdd-tinybytes Authors libdd-trace-normalization,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-normalization,Apache-2.0,David Lee libdd-trace-obfuscation,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-obfuscation,Apache-2.0,Datadog Inc. libdd-trace-protobuf,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-protobuf,Apache-2.0,The libdd-trace-protobuf Authors +libdd-trace-stats,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-stats,Apache-2.0,The libdd-trace-stats Authors libdd-trace-utils,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-utils,Apache-2.0,The libdd-trace-utils Authors libloading,https://github.com/nagisa/rust_libloading,ISC,Simonas Kazlauskas linux-raw-sys,https://github.com/sunfishcode/linux-raw-sys,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,Dan Gohman diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 068db28..1f91acf 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -7,7 +7,7 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } dogstatsd = { path = "../dogstatsd", default-features = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7", default-features = false } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 5b91a91..5d10b90 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, stats_flusher, stats_processor, + config, env_verifier, mini_agent, stats_concentrator_service, stats_flusher, stats_generator, + stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, }; @@ -73,6 +74,9 @@ pub async fn main() { let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD") .map(|val| val.to_lowercase() != "false") .unwrap_or(true); + let dd_stats_computation_enabled = env::var("DD_STATS_COMPUTATION_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); let dd_statsd_metric_namespace: Option = env::var("DD_STATSD_METRIC_NAMESPACE") .ok() .and_then(|val| parse_metric_namespace(&val)); @@ -100,6 +104,7 @@ pub async fn main() { #[allow(clippy::expect_used)] tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + info!("Stats test - buffer length: configurable / default 90, bucket duration: 10"); debug!("Logging subsystem enabled"); @@ -124,6 +129,21 @@ pub async fn main() { Arc::clone(&config), )); + // Initialize stats concentrator service and generator conditionally + let (stats_concentrator_handle, stats_generator) = if dd_stats_computation_enabled { + info!("Stats computation enabled"); + let (stats_concentrator_service, stats_concentrator_handle) = + stats_concentrator_service::StatsConcentratorService::new(config.clone()); + tokio::spawn(stats_concentrator_service.run()); + let stats_generator = Arc::new(stats_generator::StatsGenerator::new( + stats_concentrator_handle.clone(), + )); + (Some(stats_concentrator_handle), Some(stats_generator)) + } else { + info!("Stats computation disabled"); + (None, None) + }; + let mini_agent = Box::new(mini_agent::MiniAgent { config: Arc::clone(&config), env_verifier, @@ -131,6 +151,8 @@ pub async fn main() { trace_flusher, stats_processor, stats_flusher, + stats_concentrator: stats_concentrator_handle, + stats_generator, }); tokio::spawn(async move { diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 26b8290..0f05dd6 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -15,23 +15,25 @@ hyper-http-proxy = { version = "1.1.0", default-features = false, features = [ ] } hyper-util = { version = "0.1", features = ["service"] } http-body-util = "0.1" +thiserror = { version = "1.0", default-features = false } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } async-trait = "0.1.64" tracing = { version = "0.1", default-features = false } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0" -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6", features = [ +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7", features = [ "mini_agent", ] } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } [dev-dependencies] rmp-serde = "1.1.1" serial_test = "2.0.0" duplicate = "0.4.1" tempfile = "3.3.0" -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6", features = [ +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7", features = [ "test-utils", ] } diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index e6ac540..b9e81b8 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -84,6 +84,8 @@ pub struct Config { pub stats_flush_interval: u64, /// how often to flush traces, in seconds pub trace_flush_interval: u64, + /// buffer length for span concentrator + pub stats_buffer_length: usize, pub trace_intake: Endpoint, pub trace_stats_intake: Endpoint, /// timeout for environment verification, in milliseconds @@ -131,6 +133,11 @@ impl Config { Tags::new() }; + let stats_buffer_length: usize = env::var("DD_STATS_BUFFER_LENGTH") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(90); + #[allow(clippy::unwrap_used)] Ok(Config { app_name: Some(app_name), @@ -157,6 +164,7 @@ impl Config { .or_else(|_| env::var("HTTPS_PROXY")) .ok(), tags, + stats_buffer_length, }) } } diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index 165c263..ba4b7f3 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -12,7 +12,9 @@ pub mod config; pub mod env_verifier; pub mod http_utils; pub mod mini_agent; +pub mod stats_concentrator_service; pub mod stats_flusher; +pub mod stats_generator; pub mod stats_processor; pub mod trace_flusher; pub mod trace_processor; diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index dcfa6ae..7b94842 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -13,7 +13,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{debug, error}; use crate::http_utils::log_and_create_http_response; -use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; +use crate::{ + config, env_verifier, stats_concentrator_service, stats_flusher, stats_generator, + stats_processor, trace_flusher, trace_processor, +}; use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils; use libdd_trace_utils::trace_utils::SendData; @@ -32,6 +35,8 @@ pub struct MiniAgent { pub stats_processor: Arc, pub stats_flusher: Arc, pub env_verifier: Arc, + pub stats_concentrator: Option, + pub stats_generator: Option>, } impl MiniAgent { @@ -57,9 +62,33 @@ impl MiniAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. - let (trace_tx, trace_rx): (Sender, Receiver) = + let (trace_tx_internal, trace_rx): (Sender, Receiver) = mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); + // Conditionally create an intercepting channel that generates stats from traces + let trace_tx = if let Some(stats_generator) = self.stats_generator.clone() { + let (trace_tx, mut trace_rx_intercept): (Sender, Receiver) = + mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); + + // Intercept traces to generate stats, then forward to trace flusher + tokio::spawn(async move { + while let Some(send_data) = trace_rx_intercept.recv().await { + // Generate stats from the trace payload + if let Err(e) = stats_generator.send(send_data.get_payloads()) { + error!("Failed to generate stats from traces: {e}"); + } + // Forward to trace flusher + if let Err(e) = trace_tx_internal.send(send_data).await { + error!("Failed to forward traces to flusher: {e}"); + } + } + }); + trace_tx + } else { + // If stats generation is disabled, use the internal channel directly + trace_tx_internal.clone() + }; + // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); @@ -69,21 +98,61 @@ impl MiniAgent { }); // channels to send processed stats to our stats flusher. - let (stats_tx, stats_rx): ( + let (stats_tx, stats_rx_all): ( + Sender, + Receiver, + ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); + + // Channel for stats from concentrator (generated from traces) + let (stats_tx_generated, mut stats_rx_generated): ( Sender, Receiver, ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); + // Merge both stats channels into one for the flusher + let stats_tx_clone = stats_tx.clone(); + tokio::spawn(async move { + while let Some(stats) = stats_rx_generated.recv().await { + if let Err(e) = stats_tx_clone.send(stats).await { + error!("Failed to forward generated stats: {e}"); + } + } + }); + // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); tokio::spawn(async move { let stats_flusher = stats_flusher.clone(); stats_flusher - .start_stats_flusher(stats_config, stats_rx) + .start_stats_flusher(stats_config, stats_rx_all) .await; }); + // Start periodic stats flush task only if stats computation is enabled + if let Some(stats_concentrator) = self.stats_concentrator.clone() { + let flush_interval = self.config.stats_flush_interval; + tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await; + match stats_concentrator.flush(false).await { + Ok(Some(stats_payload)) => { + debug!("Flushed generated stats from concentrator"); + if let Err(e) = stats_tx_generated.send(stats_payload).await { + error!("Failed to send flushed stats: {e}"); + } + } + Ok(None) => { + debug!("No generated stats to flush"); + } + Err(e) => { + error!("Error flushing generated stats: {e}"); + } + } + } + }); + } + // setup our hyper http server, where the endpoint_handler handles incoming requests let trace_processor = self.trace_processor.clone(); let stats_processor = self.stats_processor.clone(); diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs new file mode 100644 index 0000000..94bbdb0 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -0,0 +1,189 @@ +use crate::config::Config; +use libdd_trace_protobuf::pb; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::{mpsc, oneshot}; +use tracing::error; + +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + +#[derive(Debug, thiserror::Error)] +pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] + SendError(mpsc::error::SendError), + #[error("Failed to receive response from concentrator: {0}")] + RecvError(oneshot::error::RecvError), +} + +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, +} + +pub enum ConcentratorCommand { + SetTracerMetadata(TracerMetadata), + // Use a box to reduce the size of the command enum + Add(Box), + Flush(bool, oneshot::Sender>), +} + +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, + is_tracer_metadata_set: AtomicBool, +} + +impl Clone for StatsConcentratorHandle { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + // Cloning this may cause trace metadata to be set multiple times, + // but it's okay because it's the same for all traces and we don't need to be perfect on dedup. + is_tracer_metadata_set: AtomicBool::new( + self.is_tracer_metadata_set.load(Ordering::Acquire), + ), + } + } +} + +impl StatsConcentratorHandle { + #[must_use] + pub fn new(tx: mpsc::UnboundedSender) -> Self { + Self { + tx, + is_tracer_metadata_set: AtomicBool::new(false), + } + } + + pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { + // Set tracer metadata only once for the first trace because + // it is the same for all traces. + if !self.is_tracer_metadata_set.load(Ordering::Acquire) { + self.is_tracer_metadata_set.store(true, Ordering::Release); + let tracer_metadata = TracerMetadata { + language: trace.language_name.clone(), + tracer_version: trace.tracer_version.clone(), + runtime_id: trace.runtime_id.clone(), + container_id: trace.container_id.clone(), + }; + self.tx + .send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) + .map_err(StatsError::SendError)?; + } + Ok(()) + } + + pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + self.tx + .send(ConcentratorCommand::Add(Box::new(span.clone()))) + .map_err(StatsError::SendError)?; + Ok(()) + } + + pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .map_err(StatsError::SendError)?; + response_rx.await.map_err(StatsError::RecvError) + } +} + +pub struct StatsConcentratorService { + concentrator: SpanConcentrator, + rx: mpsc::UnboundedReceiver, + tracer_metadata: TracerMetadata, + config: Arc, +} + +// A service that handles add() and flush() requests in the same queue, +// to avoid using mutex, which may cause lock contention. +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle::new(tx); + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + config.stats_buffer_length, + ); + let service: StatsConcentratorService = Self { + concentrator, + rx, + // To be set when the first trace is received + tracer_metadata: TracerMetadata::default(), + config, + }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::SetTracerMetadata(tracer_metadata) => { + self.tracer_metadata = tracer_metadata; + } + ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), + ConcentratorCommand::Flush(force_flush, response_tx) => { + self.handle_flush(force_flush, response_tx); + } + } + } + } + + fn handle_flush( + &mut self, + force_flush: bool, + response_tx: oneshot::Sender>, + ) { + let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); + let stats = if stats_buckets.is_empty() { + None + } else { + Some(ClientStatsPayload { + // Do not set hostname so the trace stats backend can aggregate stats properly + hostname: String::new(), + env: "test-env".to_string(), // TODO: handle in config + // Version is not in the trace payload. Need to read it from config. + version: "test-version".to_string(), // TODO: handle in config + lang: self.tracer_metadata.language.clone(), + tracer_version: self.tracer_metadata.tracer_version.clone(), + runtime_id: self.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + service: "test-service".to_string(), // TODO: handle in config + container_id: self.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + }; + let response = response_tx.send(stats); + if let Err(e) = response { + error!("Failed to return trace stats: {e:?}"); + } + } +} diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs new file mode 100644 index 0000000..2bd730e --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -0,0 +1,49 @@ +use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; +use libdd_trace_utils::tracer_payload::TracerPayloadCollection; +use tracing::error; + +pub struct StatsGenerator { + stats_concentrator: StatsConcentratorHandle, +} + +#[derive(Debug, thiserror::Error)] +pub enum StatsGeneratorError { + #[error("Error sending trace stats to the stats concentrator: {0}")] + ConcentratorCommandError(StatsError), + #[error("Unsupported trace payload version. Failed to send trace stats.")] + TracePayloadVersionError, +} + +// Extracts information from traces related to stats and sends it to the stats concentrator +impl StatsGenerator { + #[must_use] + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + Self { stats_concentrator } + } + + pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { + if let TracerPayloadCollection::V07(traces) = traces { + for trace in traces { + // Set tracer metadata + if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { + error!("Failed to set tracer metadata: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + + // Generate stats for each span in the trace + for chunk in &trace.chunks { + for span in &chunk.spans { + if let Err(err) = self.stats_concentrator.add(span) { + error!("Failed to send trace stats: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + } + } + } + Ok(()) + } else { + error!("Unsupported trace payload version. Failed to send trace stats."); + Err(StatsGeneratorError::TracePayloadVersionError) + } + } +}