From 095d3750580c74e3032437ce63866c938f0b9321 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 17:09:25 -0500 Subject: [PATCH 01/11] add codeowners --- .github/CODEOWNERS | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..e1f6bbd --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,4 @@ +* @DataDog/serverless-azure-gcp @DataDog/serverless-aws + +crates/datadog-serverless-compat/ @DataDog/serverless-azure-gcp +crates/datadog-trace-agent/ @DataDog/serverless-azure-gcp From 263d694373b44ad52c7a3ee466727dc491ed7367 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 3 Dec 2025 15:40:13 -0500 Subject: [PATCH 02/11] add stats generator and concentrator --- Cargo.lock | 38 ++++ crates/datadog-trace-agent/Cargo.toml | 2 + crates/datadog-trace-agent/src/lib.rs | 2 + .../src/stats_concentrator_service.rs | 188 ++++++++++++++++++ .../src/stats_generator.rs | 49 +++++ 5 files changed, 279 insertions(+) create mode 100644 crates/datadog-trace-agent/src/stats_concentrator_service.rs create mode 100644 crates/datadog-trace-agent/src/stats_generator.rs diff --git a/Cargo.lock b/Cargo.lock index dc618d7..83e23b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anyhow" version = "1.0.97" @@ -465,12 +471,14 @@ dependencies = [ "libdd-common", "libdd-trace-obfuscation", "libdd-trace-protobuf", + "libdd-trace-stats", "libdd-trace-utils", "rmp-serde", "serde", "serde_json", "serial_test", "tempfile", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -650,6 +658,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -839,6 +853,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "hashbrown" @@ -1305,6 +1324,14 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "libdd-ddsketch" +version = "1.0.0" +source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +dependencies = [ + "prost", +] + [[package]] name = "libdd-tinybytes" version = "1.0.0" @@ -1349,6 +1376,17 @@ dependencies = [ "serde_bytes", ] +[[package]] +name = "libdd-trace-stats" +version = "1.0.0" +source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +dependencies = [ + "hashbrown 0.15.2", + "libdd-ddsketch", + "libdd-trace-protobuf", + "libdd-trace-utils", +] + [[package]] name = "libdd-trace-utils" version = "1.0.0" diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 26b8290..f9fc90f 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -15,6 +15,7 @@ hyper-http-proxy = { version = "1.1.0", default-features = false, features = [ ] } hyper-util = { version = "0.1", features = ["service"] } http-body-util = "0.1" +thiserror = { version = "1.0", default-features = false } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } async-trait = "0.1.64" tracing = { version = "0.1", default-features = false } @@ -26,6 +27,7 @@ libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "4 libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6", features = [ "mini_agent", ] } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } [dev-dependencies] rmp-serde = "1.1.1" diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index 165c263..ba4b7f3 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -12,7 +12,9 @@ pub mod config; pub mod env_verifier; pub mod http_utils; pub mod mini_agent; +pub mod stats_concentrator_service; pub mod stats_flusher; +pub mod stats_generator; pub mod stats_processor; pub mod trace_flusher; pub mod trace_processor; diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs new file mode 100644 index 0000000..e6d6c70 --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -0,0 +1,188 @@ +use crate::config::Config; +use libdd_trace_protobuf::pb; +use libdd_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use libdd_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; +use tokio::sync::{mpsc, oneshot}; +use tracing::error; + +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + +#[derive(Debug, thiserror::Error)] +pub enum StatsError { + #[error("Failed to send command to concentrator: {0}")] + SendError(mpsc::error::SendError), + #[error("Failed to receive response from concentrator: {0}")] + RecvError(oneshot::error::RecvError), +} + +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, +} + +pub enum ConcentratorCommand { + SetTracerMetadata(TracerMetadata), + // Use a box to reduce the size of the command enum + Add(Box), + Flush(bool, oneshot::Sender>), +} + +pub struct StatsConcentratorHandle { + tx: mpsc::UnboundedSender, + is_tracer_metadata_set: AtomicBool, +} + +impl Clone for StatsConcentratorHandle { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + // Cloning this may cause trace metadata to be set multiple times, + // but it's okay because it's the same for all traces and we don't need to be perfect on dedup. + is_tracer_metadata_set: AtomicBool::new( + self.is_tracer_metadata_set.load(Ordering::Acquire), + ), + } + } +} + +impl StatsConcentratorHandle { + #[must_use] + pub fn new(tx: mpsc::UnboundedSender) -> Self { + Self { + tx, + is_tracer_metadata_set: AtomicBool::new(false), + } + } + + pub fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { + // Set tracer metadata only once for the first trace because + // it is the same for all traces. + if !self.is_tracer_metadata_set.load(Ordering::Acquire) { + self.is_tracer_metadata_set.store(true, Ordering::Release); + let tracer_metadata = TracerMetadata { + language: trace.language_name.clone(), + tracer_version: trace.tracer_version.clone(), + runtime_id: trace.runtime_id.clone(), + container_id: trace.container_id.clone(), + }; + self.tx + .send(ConcentratorCommand::SetTracerMetadata(tracer_metadata)) + .map_err(StatsError::SendError)?; + } + Ok(()) + } + + pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { + self.tx + .send(ConcentratorCommand::Add(Box::new(span.clone()))) + .map_err(StatsError::SendError)?; + Ok(()) + } + + pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { + let (response_tx, response_rx) = oneshot::channel(); + self.tx + .send(ConcentratorCommand::Flush(force_flush, response_tx)) + .map_err(StatsError::SendError)?; + response_rx.await.map_err(StatsError::RecvError) + } +} + +pub struct StatsConcentratorService { + concentrator: SpanConcentrator, + rx: mpsc::UnboundedReceiver, + tracer_metadata: TracerMetadata, + config: Arc, +} + +// A service that handles add() and flush() requests in the same queue, +// to avoid using mutex, which may cause lock contention. +impl StatsConcentratorService { + #[must_use] + pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { + let (tx, rx) = mpsc::unbounded_channel(); + let handle = StatsConcentratorHandle::new(tx); + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + ); + let service: StatsConcentratorService = Self { + concentrator, + rx, + // To be set when the first trace is received + tracer_metadata: TracerMetadata::default(), + config, + }; + (service, handle) + } + + pub async fn run(mut self) { + while let Some(command) = self.rx.recv().await { + match command { + ConcentratorCommand::SetTracerMetadata(tracer_metadata) => { + self.tracer_metadata = tracer_metadata; + } + ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), + ConcentratorCommand::Flush(force_flush, response_tx) => { + self.handle_flush(force_flush, response_tx); + } + } + } + } + + fn handle_flush( + &mut self, + force_flush: bool, + response_tx: oneshot::Sender>, + ) { + let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); + let stats = if stats_buckets.is_empty() { + None + } else { + Some(ClientStatsPayload { + // Do not set hostname so the trace stats backend can aggregate stats properly + hostname: String::new(), + env: "test-env".to_string(), // TODO: handle in config + // Version is not in the trace payload. Need to read it from config. + version: "test-version".to_string(), // TODO: handle in config + lang: self.tracer_metadata.language.clone(), + tracer_version: self.tracer_metadata.tracer_version.clone(), + runtime_id: self.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + service: "test-service".to_string(), // TODO: handle in config + container_id: self.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + }; + let response = response_tx.send(stats); + if let Err(e) = response { + error!("Failed to return trace stats: {e:?}"); + } + } +} diff --git a/crates/datadog-trace-agent/src/stats_generator.rs b/crates/datadog-trace-agent/src/stats_generator.rs new file mode 100644 index 0000000..2bd730e --- /dev/null +++ b/crates/datadog-trace-agent/src/stats_generator.rs @@ -0,0 +1,49 @@ +use crate::stats_concentrator_service::{StatsConcentratorHandle, StatsError}; +use libdd_trace_utils::tracer_payload::TracerPayloadCollection; +use tracing::error; + +pub struct StatsGenerator { + stats_concentrator: StatsConcentratorHandle, +} + +#[derive(Debug, thiserror::Error)] +pub enum StatsGeneratorError { + #[error("Error sending trace stats to the stats concentrator: {0}")] + ConcentratorCommandError(StatsError), + #[error("Unsupported trace payload version. Failed to send trace stats.")] + TracePayloadVersionError, +} + +// Extracts information from traces related to stats and sends it to the stats concentrator +impl StatsGenerator { + #[must_use] + pub fn new(stats_concentrator: StatsConcentratorHandle) -> Self { + Self { stats_concentrator } + } + + pub fn send(&self, traces: &TracerPayloadCollection) -> Result<(), StatsGeneratorError> { + if let TracerPayloadCollection::V07(traces) = traces { + for trace in traces { + // Set tracer metadata + if let Err(err) = self.stats_concentrator.set_tracer_metadata(trace) { + error!("Failed to set tracer metadata: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + + // Generate stats for each span in the trace + for chunk in &trace.chunks { + for span in &chunk.spans { + if let Err(err) = self.stats_concentrator.add(span) { + error!("Failed to send trace stats: {err}"); + return Err(StatsGeneratorError::ConcentratorCommandError(err)); + } + } + } + } + Ok(()) + } else { + error!("Unsupported trace payload version. Failed to send trace stats."); + Err(StatsGeneratorError::TracePayloadVersionError) + } + } +} From 5740557999fb44ae75a2c8c49d1c24c08ee72d8a Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 3 Dec 2025 16:20:07 -0500 Subject: [PATCH 03/11] implement trace stats generator and concentrator --- crates/datadog-serverless-compat/src/main.rs | 13 +++- crates/datadog-trace-agent/src/mini_agent.rs | 72 ++++++++++++++++++-- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 5b91a91..a87f911 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, stats_flusher, stats_processor, + config, env_verifier, mini_agent, stats_concentrator_service, stats_flusher, stats_generator, + stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, }; @@ -124,6 +125,14 @@ pub async fn main() { Arc::clone(&config), )); + // Initialize stats concentrator service and generator + let (stats_concentrator_service, stats_concentrator_handle) = + stats_concentrator_service::StatsConcentratorService::new(config.clone()); + tokio::spawn(stats_concentrator_service.run()); + let stats_generator = Arc::new(stats_generator::StatsGenerator::new( + stats_concentrator_handle.clone(), + )); + let mini_agent = Box::new(mini_agent::MiniAgent { config: Arc::clone(&config), env_verifier, @@ -131,6 +140,8 @@ pub async fn main() { trace_flusher, stats_processor, stats_flusher, + stats_concentrator: stats_concentrator_handle, + stats_generator, }); tokio::spawn(async move { diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index dcfa6ae..bae1fa6 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -13,7 +13,10 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{debug, error}; use crate::http_utils::log_and_create_http_response; -use crate::{config, env_verifier, stats_flusher, stats_processor, trace_flusher, trace_processor}; +use crate::{ + config, env_verifier, stats_concentrator_service, stats_flusher, stats_generator, + stats_processor, trace_flusher, trace_processor, +}; use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils; use libdd_trace_utils::trace_utils::SendData; @@ -32,9 +35,12 @@ pub struct MiniAgent { pub stats_processor: Arc, pub stats_flusher: Arc, pub env_verifier: Arc, + pub stats_concentrator: stats_concentrator_service::StatsConcentratorHandle, + pub stats_generator: Arc, } impl MiniAgent { + pub async fn start_mini_agent(&self) -> Result<(), Box> { let now = Instant::now(); @@ -57,9 +63,28 @@ impl MiniAgent { // setup a channel to send processed traces to our flusher. tx is passed through each // endpoint_handler to the trace processor, which uses it to send de-serialized // processed trace payloads to our trace flusher. - let (trace_tx, trace_rx): (Sender, Receiver) = + let (trace_tx_internal, trace_rx): (Sender, Receiver) = + mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); + + // Create an intercepting channel that generates stats from traces + let (trace_tx, mut trace_rx_intercept): (Sender, Receiver) = mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); + // Intercept traces to generate stats, then forward to trace flusher + let stats_generator = self.stats_generator.clone(); + tokio::spawn(async move { + while let Some(send_data) = trace_rx_intercept.recv().await { + // Generate stats from the trace payload + if let Err(e) = stats_generator.send(send_data.get_payloads()) { + error!("Failed to generate stats from traces: {e}"); + } + // Forward to trace flusher + if let Err(e) = trace_tx_internal.send(send_data).await { + error!("Failed to forward traces to flusher: {e}"); + } + } + }); + // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. let trace_flusher = self.trace_flusher.clone(); @@ -69,21 +94,60 @@ impl MiniAgent { }); // channels to send processed stats to our stats flusher. - let (stats_tx, stats_rx): ( + let (stats_tx, stats_rx_all): ( Sender, Receiver, ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); + // Channel for stats from concentrator (generated from traces) + let (stats_tx_generated, mut stats_rx_generated): ( + Sender, + Receiver, + ) = mpsc::channel(STATS_PAYLOAD_CHANNEL_BUFFER_SIZE); + + // Merge both stats channels into one for the flusher + let stats_tx_clone = stats_tx.clone(); + tokio::spawn(async move { + while let Some(stats) = stats_rx_generated.recv().await { + if let Err(e) = stats_tx_clone.send(stats).await { + error!("Failed to forward generated stats: {e}"); + } + } + }); + // start our stats flusher. let stats_flusher = self.stats_flusher.clone(); let stats_config = self.config.clone(); tokio::spawn(async move { let stats_flusher = stats_flusher.clone(); stats_flusher - .start_stats_flusher(stats_config, stats_rx) + .start_stats_flusher(stats_config, stats_rx_all) .await; }); + // Start periodic stats flush task + let stats_concentrator = self.stats_concentrator.clone(); + let flush_interval = self.config.stats_flush_interval; + tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await; + match stats_concentrator.flush(false).await { + Ok(Some(stats_payload)) => { + debug!("Flushed generated stats from concentrator"); + if let Err(e) = stats_tx_generated.send(stats_payload).await { + error!("Failed to send flushed stats: {e}"); + } + } + Ok(None) => { + debug!("No generated stats to flush"); + } + Err(e) => { + error!("Error flushing generated stats: {e}"); + } + } + } + }); + // setup our hyper http server, where the endpoint_handler handles incoming requests let trace_processor = self.trace_processor.clone(); let stats_processor = self.stats_processor.clone(); From e1cf182fdb1b179faaf5785dd0da17f20a8f3b10 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 3 Dec 2025 16:26:03 -0500 Subject: [PATCH 04/11] fix formatting and add licenses --- LICENSE-3rdparty.csv | 4 ++++ crates/datadog-trace-agent/src/mini_agent.rs | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c531df8..d9870e4 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -3,6 +3,7 @@ addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink , oyvindln " ahash,https://github.com/tkaitchuck/ahash,MIT OR Apache-2.0,Tom Kaitchuck aho-corasick,https://github.com/BurntSushi/aho-corasick,Unlicense OR MIT,Andrew Gallant +allocator-api2,https://github.com/zakarumych/allocator-api2,MIT OR Apache-2.0,Zakarum anyhow,https://github.com/dtolnay/anyhow,MIT OR Apache-2.0,David Tolnay assert-json-diff,https://github.com/davidpdrsn/assert-json-diff,MIT,David Pedersen async-lock,https://github.com/smol-rs/async-lock,Apache-2.0 OR MIT,Stjepan Glavina @@ -54,6 +55,7 @@ fixedbitset,https://github.com/petgraph/fixedbitset,MIT OR Apache-2.0,bluss flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float-cmp,https://github.com/mikedilger/float-cmp,MIT,Mike Dilger fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton +foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Authors futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors @@ -109,10 +111,12 @@ lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2. lazycell,https://github.com/indiv0/lazycell,MIT OR Apache-2.0,"Alex Crichton , Nikita Pekin " libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libdd-common,https://github.com/DataDog/libdatadog/tree/main/datadog-common,Apache-2.0,The libdd-common Authors +libdd-ddsketch,https://github.com/DataDog/libdatadog/tree/main/libdd-ddsketch,Apache-2.0,The libdd-ddsketch Authors libdd-tinybytes,https://github.com/DataDog/libdatadog/tree/main/libdd-tinybytes,Apache-2.0,The libdd-tinybytes Authors libdd-trace-normalization,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-normalization,Apache-2.0,David Lee libdd-trace-obfuscation,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-obfuscation,Apache-2.0,Datadog Inc. libdd-trace-protobuf,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-protobuf,Apache-2.0,The libdd-trace-protobuf Authors +libdd-trace-stats,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-stats,Apache-2.0,The libdd-trace-stats Authors libdd-trace-utils,https://github.com/DataDog/libdatadog/tree/main/libdd-trace-utils,Apache-2.0,The libdd-trace-utils Authors libloading,https://github.com/nagisa/rust_libloading,ISC,Simonas Kazlauskas linux-raw-sys,https://github.com/sunfishcode/linux-raw-sys,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,Dan Gohman diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index bae1fa6..caf75bb 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -40,7 +40,6 @@ pub struct MiniAgent { } impl MiniAgent { - pub async fn start_mini_agent(&self) -> Result<(), Box> { let now = Instant::now(); From be227f45a2ba430bd87a5414455bea27ede16c2a Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Tue, 9 Dec 2025 11:24:13 -0500 Subject: [PATCH 05/11] add DD_STATS_COMPUTATION_ENABLED env var --- crates/datadog-serverless-compat/src/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index a87f911..fc56ca3 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -74,6 +74,9 @@ pub async fn main() { let dd_use_dogstatsd = env::var("DD_USE_DOGSTATSD") .map(|val| val.to_lowercase() != "false") .unwrap_or(true); + let dd_stats_computation_enabled = env::var("DD_STATS_COMPUTATION_ENABLED") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); let dd_statsd_metric_namespace: Option = env::var("DD_STATSD_METRIC_NAMESPACE") .ok() .and_then(|val| parse_metric_namespace(&val)); From 0997c939f5cbf9864b1032091286cd1423dfe8cb Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 10 Dec 2025 14:32:17 -0500 Subject: [PATCH 06/11] make agent stats computation conditional --- crates/datadog-serverless-compat/src/main.rs | 21 +++-- crates/datadog-trace-agent/src/mini_agent.rs | 80 +++++++++++--------- 2 files changed, 57 insertions(+), 44 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index fc56ca3..a657d9b 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -128,13 +128,20 @@ pub async fn main() { Arc::clone(&config), )); - // Initialize stats concentrator service and generator - let (stats_concentrator_service, stats_concentrator_handle) = - stats_concentrator_service::StatsConcentratorService::new(config.clone()); - tokio::spawn(stats_concentrator_service.run()); - let stats_generator = Arc::new(stats_generator::StatsGenerator::new( - stats_concentrator_handle.clone(), - )); + // Initialize stats concentrator service and generator conditionally + let (stats_concentrator_handle, stats_generator) = if dd_stats_computation_enabled { + info!("Stats computation enabled"); + let (stats_concentrator_service, stats_concentrator_handle) = + stats_concentrator_service::StatsConcentratorService::new(config.clone()); + tokio::spawn(stats_concentrator_service.run()); + let stats_generator = Arc::new(stats_generator::StatsGenerator::new( + stats_concentrator_handle.clone(), + )); + (Some(stats_concentrator_handle), Some(stats_generator)) + } else { + info!("Stats computation disabled"); + (None, None) + }; let mini_agent = Box::new(mini_agent::MiniAgent { config: Arc::clone(&config), diff --git a/crates/datadog-trace-agent/src/mini_agent.rs b/crates/datadog-trace-agent/src/mini_agent.rs index caf75bb..7b94842 100644 --- a/crates/datadog-trace-agent/src/mini_agent.rs +++ b/crates/datadog-trace-agent/src/mini_agent.rs @@ -35,8 +35,8 @@ pub struct MiniAgent { pub stats_processor: Arc, pub stats_flusher: Arc, pub env_verifier: Arc, - pub stats_concentrator: stats_concentrator_service::StatsConcentratorHandle, - pub stats_generator: Arc, + pub stats_concentrator: Option, + pub stats_generator: Option>, } impl MiniAgent { @@ -65,24 +65,29 @@ impl MiniAgent { let (trace_tx_internal, trace_rx): (Sender, Receiver) = mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); - // Create an intercepting channel that generates stats from traces - let (trace_tx, mut trace_rx_intercept): (Sender, Receiver) = - mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); + // Conditionally create an intercepting channel that generates stats from traces + let trace_tx = if let Some(stats_generator) = self.stats_generator.clone() { + let (trace_tx, mut trace_rx_intercept): (Sender, Receiver) = + mpsc::channel(TRACER_PAYLOAD_CHANNEL_BUFFER_SIZE); - // Intercept traces to generate stats, then forward to trace flusher - let stats_generator = self.stats_generator.clone(); - tokio::spawn(async move { - while let Some(send_data) = trace_rx_intercept.recv().await { - // Generate stats from the trace payload - if let Err(e) = stats_generator.send(send_data.get_payloads()) { - error!("Failed to generate stats from traces: {e}"); - } - // Forward to trace flusher - if let Err(e) = trace_tx_internal.send(send_data).await { - error!("Failed to forward traces to flusher: {e}"); + // Intercept traces to generate stats, then forward to trace flusher + tokio::spawn(async move { + while let Some(send_data) = trace_rx_intercept.recv().await { + // Generate stats from the trace payload + if let Err(e) = stats_generator.send(send_data.get_payloads()) { + error!("Failed to generate stats from traces: {e}"); + } + // Forward to trace flusher + if let Err(e) = trace_tx_internal.send(send_data).await { + error!("Failed to forward traces to flusher: {e}"); + } } - } - }); + }); + trace_tx + } else { + // If stats generation is disabled, use the internal channel directly + trace_tx_internal.clone() + }; // start our trace flusher. receives trace payloads and handles buffering + deciding when to // flush to backend. @@ -124,28 +129,29 @@ impl MiniAgent { .await; }); - // Start periodic stats flush task - let stats_concentrator = self.stats_concentrator.clone(); - let flush_interval = self.config.stats_flush_interval; - tokio::spawn(async move { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await; - match stats_concentrator.flush(false).await { - Ok(Some(stats_payload)) => { - debug!("Flushed generated stats from concentrator"); - if let Err(e) = stats_tx_generated.send(stats_payload).await { - error!("Failed to send flushed stats: {e}"); + // Start periodic stats flush task only if stats computation is enabled + if let Some(stats_concentrator) = self.stats_concentrator.clone() { + let flush_interval = self.config.stats_flush_interval; + tokio::spawn(async move { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(flush_interval)).await; + match stats_concentrator.flush(false).await { + Ok(Some(stats_payload)) => { + debug!("Flushed generated stats from concentrator"); + if let Err(e) = stats_tx_generated.send(stats_payload).await { + error!("Failed to send flushed stats: {e}"); + } + } + Ok(None) => { + debug!("No generated stats to flush"); + } + Err(e) => { + error!("Error flushing generated stats: {e}"); } - } - Ok(None) => { - debug!("No generated stats to flush"); - } - Err(e) => { - error!("Error flushing generated stats: {e}"); } } - } - }); + }); + } // setup our hyper http server, where the endpoint_handler handles incoming requests let trace_processor = self.trace_processor.clone(); From c26b1ae1d99f389670f8f75cc0b2e7faf4ed6ef6 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 10:18:34 -0500 Subject: [PATCH 07/11] pass buffer length to span concentrator and set to 90 --- Cargo.lock | 16 ++++++++-------- crates/datadog-serverless-compat/Cargo.toml | 2 +- crates/datadog-trace-agent/Cargo.toml | 12 ++++++------ .../src/stats_concentrator_service.rs | 2 ++ 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83e23b5..5c82ea0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1294,7 +1294,7 @@ checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "libdd-common" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "cc", @@ -1327,7 +1327,7 @@ dependencies = [ [[package]] name = "libdd-ddsketch" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "prost", ] @@ -1335,7 +1335,7 @@ dependencies = [ [[package]] name = "libdd-tinybytes" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "serde", ] @@ -1343,7 +1343,7 @@ dependencies = [ [[package]] name = "libdd-trace-normalization" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "libdd-trace-protobuf", @@ -1352,7 +1352,7 @@ dependencies = [ [[package]] name = "libdd-trace-obfuscation" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "libdd-common", @@ -1369,7 +1369,7 @@ dependencies = [ [[package]] name = "libdd-trace-protobuf" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "prost", "serde", @@ -1379,7 +1379,7 @@ dependencies = [ [[package]] name = "libdd-trace-stats" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "hashbrown 0.15.2", "libdd-ddsketch", @@ -1390,7 +1390,7 @@ dependencies = [ [[package]] name = "libdd-trace-utils" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=435107c245112397914935c0f7148a18b91cafc6#435107c245112397914935c0f7148a18b91cafc6" +source = "git+https://github.com/DataDog/libdatadog?rev=774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7#774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" dependencies = [ "anyhow", "bytes", diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index 068db28..1f91acf 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -7,7 +7,7 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } dogstatsd = { path = "../dogstatsd", default-features = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7", default-features = false } diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index f9fc90f..0f05dd6 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -21,19 +21,19 @@ async-trait = "0.1.64" tracing = { version = "0.1", default-features = false } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0" -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6", features = [ +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7", features = [ "mini_agent", ] } -libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6" } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7" } [dev-dependencies] rmp-serde = "1.1.1" serial_test = "2.0.0" duplicate = "0.4.1" tempfile = "3.3.0" -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "435107c245112397914935c0f7148a18b91cafc6", features = [ +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "774f830c7ffa62122a2cf8f3eae8c64a3ca5a8d7", features = [ "test-utils", ] } diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index e6d6c70..6b5ab7a 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -10,6 +10,7 @@ use tracing::error; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds +const BUFFER_LENGTH: usize = 90; #[derive(Debug, thiserror::Error)] pub enum StatsError { @@ -118,6 +119,7 @@ impl StatsConcentratorService { SystemTime::now(), vec![], vec![], + BUFFER_LENGTH ); let service: StatsConcentratorService = Self { concentrator, From d368d761f09cfd083b0fce4639dfb8ab63f84691 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 10:21:42 -0500 Subject: [PATCH 08/11] add info log for testing --- crates/datadog-serverless-compat/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index a657d9b..b307f00 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -46,6 +46,8 @@ const AGENT_HOST: &str = "0.0.0.0"; #[tokio::main] pub async fn main() { + info!("Stats test - buffer length: 90, bucket duration: 10"); + let log_level = env::var("DD_LOG_LEVEL") .map(|val| val.to_lowercase()) .unwrap_or("info".to_string()); From ae4b63cbd4f32495228974adada20509517a39d8 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 10:26:16 -0500 Subject: [PATCH 09/11] apply formatting --- crates/datadog-trace-agent/src/stats_concentrator_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index 6b5ab7a..f27a73d 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -119,7 +119,7 @@ impl StatsConcentratorService { SystemTime::now(), vec![], vec![], - BUFFER_LENGTH + BUFFER_LENGTH, ); let service: StatsConcentratorService = Self { concentrator, From dc53c2b365be7179bc0289323aafce079e70d8a1 Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 11:46:12 -0500 Subject: [PATCH 10/11] make buffer length configurable --- crates/datadog-serverless-compat/src/main.rs | 2 +- crates/datadog-trace-agent/src/config.rs | 8 ++++++++ .../datadog-trace-agent/src/stats_concentrator_service.rs | 3 +-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index b307f00..6fbebb9 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -46,7 +46,7 @@ const AGENT_HOST: &str = "0.0.0.0"; #[tokio::main] pub async fn main() { - info!("Stats test - buffer length: 90, bucket duration: 10"); + info!("Stats test - buffer length: configurable / default 90, bucket duration: 10"); let log_level = env::var("DD_LOG_LEVEL") .map(|val| val.to_lowercase()) diff --git a/crates/datadog-trace-agent/src/config.rs b/crates/datadog-trace-agent/src/config.rs index e6ac540..b9e81b8 100644 --- a/crates/datadog-trace-agent/src/config.rs +++ b/crates/datadog-trace-agent/src/config.rs @@ -84,6 +84,8 @@ pub struct Config { pub stats_flush_interval: u64, /// how often to flush traces, in seconds pub trace_flush_interval: u64, + /// buffer length for span concentrator + pub stats_buffer_length: usize, pub trace_intake: Endpoint, pub trace_stats_intake: Endpoint, /// timeout for environment verification, in milliseconds @@ -131,6 +133,11 @@ impl Config { Tags::new() }; + let stats_buffer_length: usize = env::var("DD_STATS_BUFFER_LENGTH") + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(90); + #[allow(clippy::unwrap_used)] Ok(Config { app_name: Some(app_name), @@ -157,6 +164,7 @@ impl Config { .or_else(|_| env::var("HTTPS_PROXY")) .ok(), tags, + stats_buffer_length, }) } } diff --git a/crates/datadog-trace-agent/src/stats_concentrator_service.rs b/crates/datadog-trace-agent/src/stats_concentrator_service.rs index f27a73d..94bbdb0 100644 --- a/crates/datadog-trace-agent/src/stats_concentrator_service.rs +++ b/crates/datadog-trace-agent/src/stats_concentrator_service.rs @@ -10,7 +10,6 @@ use tracing::error; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds -const BUFFER_LENGTH: usize = 90; #[derive(Debug, thiserror::Error)] pub enum StatsError { @@ -119,7 +118,7 @@ impl StatsConcentratorService { SystemTime::now(), vec![], vec![], - BUFFER_LENGTH, + config.stats_buffer_length, ); let service: StatsConcentratorService = Self { concentrator, From d5a3f08cb5d8467fe4f903e2a3a6915676e0826d Mon Sep 17 00:00:00 2001 From: Duncan Harvey Date: Wed, 17 Dec 2025 11:49:22 -0500 Subject: [PATCH 11/11] fix log location --- crates/datadog-serverless-compat/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 6fbebb9..5d10b90 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -46,8 +46,6 @@ const AGENT_HOST: &str = "0.0.0.0"; #[tokio::main] pub async fn main() { - info!("Stats test - buffer length: configurable / default 90, bucket duration: 10"); - let log_level = env::var("DD_LOG_LEVEL") .map(|val| val.to_lowercase()) .unwrap_or("info".to_string()); @@ -106,6 +104,7 @@ pub async fn main() { #[allow(clippy::expect_used)] tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + info!("Stats test - buffer length: configurable / default 90, bucket duration: 10"); debug!("Logging subsystem enabled");