diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index c99723d58..b141e71d9 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -2477,8 +2477,8 @@ mod tests { ); } - #[test] - fn enrich_ctx_keeps_tracer_set_cold_start_trace_id_without_tracer_detected() { + #[tokio::test] + async fn enrich_ctx_keeps_tracer_set_cold_start_trace_id_without_tracer_detected() { let mut p = setup(); let request_id = String::from("node-python-cold-start"); let mut cold_start_span = create_empty_span( diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index 6523c8569..e540fe502 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -12,6 +12,68 @@ use tracing::error; const S_TO_NS: u64 = 1_000_000_000; const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds +/// Span kinds eligible for stats computation, matching the Go agent's default +/// `ComputeStatsBySpanKind: true` behavior. +/// Reference: `datadog-agent/pkg/trace/stats/span_concentrator.go` (`KindsComputed`) +/// +/// TODO: The source of truth is the Go agent's `KindsComputed`; this list is hand-copied here +/// and in other Rust repos. Refactor so they stay in sync with the Go agent instead of each +/// keeping its own copy. +const STATS_ELIGIBLE_SPAN_KINDS: &[&str] = &["client", "consumer", "producer", "server"]; + +/// Default peer tag keys for stats aggregation, matching the Go agent's `basePeerTags` +/// derived from pkg/trace/semantics/mappings.json via the 16 peer tag concepts. +/// Reference: `datadog-agent/pkg/trace/config/peer_tags.go` (`peerTagConcepts` + `basePeerTags`) +/// +/// TODO: The source of truth is the Go agent's `basePeerTags` (derived from +/// pkg/trace/semantics/mappings.json); this list is hand-copied here and in other Rust repos. +/// Refactor so they stay in sync with the Go agent instead of each keeping its own copy. +const DEFAULT_PEER_TAG_KEYS: &[&str] = &[ + "_dd.base_service", + "active_record.db.vendor", + "amqp.destination", + "amqp.exchange", + "amqp.queue", + "aws.queue.name", + "aws.s3.bucket", + "bucketname", + "cassandra.keyspace", + "db.cassandra.contact.points", + "db.couchbase.seed.nodes", + "db.hostname", + "db.instance", + "db.name", + "db.namespace", + "db.system", + "db.type", + "dns.hostname", + "grpc.host", + "hostname", + "http.host", + "http.server_name", + "messaging.destination", + "messaging.destination.name", + "messaging.kafka.bootstrap.servers", + "messaging.rabbitmq.exchange", + "messaging.system", + "mongodb.db", + "msmq.queue.path", + "net.peer.name", + "network.destination.ip", + "network.destination.name", + "out.host", + "peer.hostname", + "peer.service", + "queuename", + "rpc.service", + "rpc.system", + "sequel.db.vendor", + "server.address", + "streamname", + "tablename", + "topicname", +]; + #[derive(Debug, thiserror::Error)] pub enum StatsError { #[error("Failed to send command to concentrator: {0}")] @@ -113,12 +175,17 @@ impl StatsConcentratorService { pub fn new(config: Arc) -> (Self, StatsConcentratorHandle) { let (tx, rx) = mpsc::unbounded_channel(); let handle = StatsConcentratorHandle::new(tx); - // TODO: set span_kinds_stats_computed and peer_tag_keys let concentrator = SpanConcentrator::new( Duration::from_nanos(BUCKET_DURATION_NS), SystemTime::now(), - vec![], - vec![], + STATS_ELIGIBLE_SPAN_KINDS + .iter() + .map(ToString::to_string) + .collect(), + DEFAULT_PEER_TAG_KEYS + .iter() + .map(ToString::to_string) + .collect(), ); let service: StatsConcentratorService = Self { concentrator, @@ -192,3 +259,121 @@ impl StatsConcentratorService { } } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use std::collections::HashMap; + + /// Create a `pb::Span` with the given meta tags and metrics. + /// The span is non-root (`parent_id=1`) and not measured, so it will only be + /// eligible for stats if `span_kinds_stats_computed` includes its `span.kind`. + fn create_span_kind_span(span_kind: &str, meta: Vec<(&str, &str)>) -> pb::Span { + let now_ns = i64::try_from( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(), + ) + .unwrap(); + let mut meta_map: HashMap = meta + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + meta_map.insert("span.kind".to_string(), span_kind.to_string()); + pb::Span { + service: "test-service".to_string(), + name: "test-op".to_string(), + resource: "test-resource".to_string(), + trace_id: 1, + span_id: 2, + parent_id: 1, // non-root + start: now_ns, + duration: 100, + error: 0, + r#type: "web".to_string(), + meta: meta_map, + metrics: HashMap::new(), // no _top_level, no _dd.measured + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + } + } + + /// A non-root, non-measured span with `span.kind`="client" should produce stats + /// because `span_kinds_stats_computed` is populated with the eligible span kinds. + #[tokio::test] + async fn test_span_kind_stats_computed() { + let config = Arc::new(Config::default()); + let (service, handle) = StatsConcentratorService::new(config); + tokio::spawn(service.run()); + + let span = create_span_kind_span("client", vec![]); + handle.add(&span).unwrap(); + + let result = handle.flush(true).await.unwrap(); + + assert!( + result.is_some(), + "Expected stats for a client span, but got None. \ + span.kind-based eligibility is not working." + ); + let payload = result.unwrap(); + let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect(); + assert!( + !all_stats.is_empty(), + "Expected at least one grouped stats entry for the client span." + ); + let client_stats: Vec<_> = all_stats + .iter() + .filter(|s| s.span_kind == "client") + .collect(); + assert!( + !client_stats.is_empty(), + "Expected a stats entry with span_kind='client'." + ); + } + + /// A client span with peer tag meta keys (`db.instance`, `db.system`) should produce + /// stats with non-empty `peer_tags` because `peer_tag_keys` is configured. + #[tokio::test] + async fn test_peer_tags_populated() { + let config = Arc::new(Config::default()); + let (service, handle) = StatsConcentratorService::new(config); + tokio::spawn(service.run()); + + let span = create_span_kind_span( + "client", + vec![("db.instance", "i-1234"), ("db.system", "postgres")], + ); + handle.add(&span).unwrap(); + + let result = handle.flush(true).await.unwrap(); + + assert!( + result.is_some(), + "Expected stats for a client span with peer tags, but got None." + ); + let payload = result.unwrap(); + let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect(); + let stats_with_peer_tags: Vec<_> = all_stats + .iter() + .filter(|s| !s.peer_tags.is_empty()) + .collect(); + assert!( + !stats_with_peer_tags.is_empty(), + "Expected at least one stats entry with non-empty peer_tags, \ + but all entries have empty peer_tags." + ); + let peer_tags = &stats_with_peer_tags[0].peer_tags; + assert!( + peer_tags.iter().any(|t| t.starts_with("db.instance:")), + "Expected peer_tags to contain db.instance, got: {peer_tags:?}" + ); + assert!( + peer_tags.iter().any(|t| t.starts_with("db.system:")), + "Expected peer_tags to contain db.system, got: {peer_tags:?}" + ); + } +} diff --git a/bottlecap/tests/apm_integration_test.rs b/bottlecap/tests/apm_integration_test.rs index 8fd559af5..ec7cb4b58 100644 --- a/bottlecap/tests/apm_integration_test.rs +++ b/bottlecap/tests/apm_integration_test.rs @@ -14,8 +14,10 @@ //! This is what APMSVLS-496 phase 1 unblocks: regression coverage for //! payload-level changes that `body_contains`-style mocks can't catch. +use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use bottlecap::LAMBDA_RUNTIME_SLUG; use bottlecap::config::Config; @@ -498,3 +500,148 @@ async fn e2e_client_computed_stats_absent_meta_and_no_stats() { ); assert!(outcome.stats.is_empty(), "no stats payloads must be sent",); } + +/// Build a non-root, non-measured span eligible for stats only via its `span.kind`. +/// +/// `parent_id` is non-zero (non-root) and `metrics` is empty (no `_top_level` / +/// `_dd.measured`), so the concentrator will only compute stats for it when its +/// `span.kind` is in `span_kinds_stats_computed`. `start` is set to "now" so the +/// span lands in the current bucket and a forced flush returns it. +fn make_eligible_span(span_kind: &str, peer_meta: &[(&str, &str)]) -> pb::Span { + let now_ns = i64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time before unix epoch") + .as_nanos(), + ) + .expect("nanos since epoch must fit in i64"); + + let mut meta: HashMap = peer_meta + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect(); + meta.insert("span.kind".to_string(), span_kind.to_string()); + + pb::Span { + service: "fake-intake-stats-service".to_string(), + name: "test-op".to_string(), + resource: "test-resource".to_string(), + trace_id: 1, + span_id: 2, + parent_id: 1, // non-root + start: now_ns, + duration: 100, + error: 0, + r#type: "web".to_string(), + meta, + metrics: HashMap::new(), // no _top_level, no _dd.measured + ..pb::Span::default() + } +} + +/// Wire concentrator -> aggregator -> flusher pointed at the fake intake, feed in +/// `spans`, force a flush, and return the single captured `StatsPayload`. +async fn flush_spans_to_fake_intake( + fake_intake: &FakeIntake, + spans: &[pb::Span], +) -> pb::StatsPayload { + let config = test_config(); + let http_client = create_client(None, None, false).expect("failed to create http client"); + + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(concentrator_service.run()); + + let aggregator = Arc::new(Mutex::new(StatsAggregator::new_with_concentrator( + concentrator_handle.clone(), + ))); + + for span in spans { + concentrator_handle + .add(span) + .expect("concentrator add must succeed"); + } + + let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY)); + let flusher = StatsFlusher::new( + api_key_factory, + aggregator, + config, + http_client, + fake_intake.stats_url(), + ); + + let failed = flusher.flush(true, None).await; + assert!( + failed.is_none(), + "stats flush reported a retry-able failure: {failed:?}", + ); + + let captured = fake_intake.stats_payloads(); + assert_eq!(captured.len(), 1, "expected exactly one StatsPayload"); + captured.into_iter().next().expect("captured payload") +} + +/// End-to-end: a non-root, non-measured `span.kind="server"` span fed through the +/// concentrator must survive aggregation + msgpack/gzip serialization and arrive +/// at the intake as a grouped-stats entry with `span_kind="server"`. This closes +/// the gap left by the in-memory concentrator unit tests, which never serialize. +#[tokio::test] +async fn stats_span_kind_through_fake_intake() { + let fake_intake = FakeIntake::start().await; + let span = make_eligible_span("server", &[]); + + let payload = flush_spans_to_fake_intake(&fake_intake, &[span]).await; + + let grouped: Vec<_> = payload + .stats + .iter() + .flat_map(|p| &p.stats) + .flat_map(|b| &b.stats) + .collect(); + assert!( + !grouped.is_empty(), + "expected at least one grouped-stats entry for the server span", + ); + assert!( + grouped.iter().any(|s| s.span_kind == "server"), + "expected a grouped-stats entry with span_kind='server', got: {:?}", + grouped.iter().map(|s| &s.span_kind).collect::>(), + ); +} + +/// End-to-end: a `span.kind="client"` span carrying peer-tag meta keys +/// (`db.instance`, `db.system`) must arrive at the intake with those keys +/// populated in `peer_tags`, proving peer-tags survive serialization through +/// the concentrator -> flusher -> intake path. +#[tokio::test] +async fn stats_peer_tags_through_fake_intake() { + let fake_intake = FakeIntake::start().await; + let span = make_eligible_span( + "client", + &[("db.instance", "i-1234"), ("db.system", "postgres")], + ); + + let payload = flush_spans_to_fake_intake(&fake_intake, &[span]).await; + + let with_peer_tags: Vec<_> = payload + .stats + .iter() + .flat_map(|p| &p.stats) + .flat_map(|b| &b.stats) + .filter(|s| !s.peer_tags.is_empty()) + .collect(); + assert!( + !with_peer_tags.is_empty(), + "expected at least one grouped-stats entry with non-empty peer_tags", + ); + let peer_tags = &with_peer_tags[0].peer_tags; + assert!( + peer_tags.iter().any(|t| t.starts_with("db.instance:")), + "expected peer_tags to contain db.instance, got: {peer_tags:?}", + ); + assert!( + peer_tags.iter().any(|t| t.starts_with("db.system:")), + "expected peer_tags to contain db.system, got: {peer_tags:?}", + ); +}