From 4702c4efa7c45e0d6b71ba25d3917977c2dc694b Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:37:48 -0500 Subject: [PATCH 1/6] Add an integration test that covers all of datadog-trace-agent --- .../tests/common/mock_server.rs | 128 ++++++++++++++++++ .../datadog-trace-agent/tests/common/mod.rs | 1 + .../tests/integration_test.rs | 126 ++++++++++++++++- 3 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 crates/datadog-trace-agent/tests/common/mock_server.rs diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs new file mode 100644 index 0000000..c1b00bd --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -0,0 +1,128 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Simple mock HTTP server for testing flushers + +use http_body_util::BodyExt; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; + +#[derive(Clone, Debug)] +pub struct ReceivedRequest { + pub method: String, + pub path: String, + pub headers: Vec<(String, String)>, + pub body: Vec, +} + +#[derive(Clone)] +pub struct MockServer { + pub addr: SocketAddr, + pub received_requests: Arc>>, +} + +impl MockServer { + /// Start a mock HTTP server on a random port + pub async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind mock server"); + let addr = listener.local_addr().expect("Failed to get local addr"); + + let received_requests = Arc::new(Mutex::new(Vec::new())); + let requests_clone = received_requests.clone(); + + tokio::spawn(async move { + loop { + let (stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(_) => break, + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } + }); + + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + }); + + MockServer { + addr, + received_requests, + } + } + + /// Get the base URL of the mock server + pub fn url(&self) -> String { + format!("http://{}", self.addr) + } + + /// Get all received requests + #[allow(dead_code)] + pub fn get_requests(&self) -> Vec { + self.received_requests.lock().unwrap().clone() + } + + /// Get requests matching a path + pub fn get_requests_for_path(&self, path: &str) -> Vec { + self.received_requests + .lock() + .unwrap() + .iter() + .filter(|req| req.path == path) + .cloned() + .collect() + } + + /// Clear all received requests + #[allow(dead_code)] + pub fn clear_requests(&self) { + self.received_requests.lock().unwrap().clear(); + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs index 3d83bbd..447946a 100644 --- a/crates/datadog-trace-agent/tests/common/mod.rs +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -4,4 +4,5 @@ //! Common test utilities, mocks, and helpers for integration tests pub mod helpers; +pub mod mock_server; pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 62d632e..b842a02 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -7,12 +7,13 @@ use common::helpers::{create_test_trace_payload, send_tcp_request}; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, - trace_processor::ServerlessTraceProcessor, + trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; use libdd_trace_utils::trace_utils; use serde_json::Value; +use serial_test::serial; use std::sync::Arc; use std::time::Duration; @@ -49,6 +50,7 @@ pub fn create_tcp_test_config() -> Config { #[cfg(test)] #[tokio::test] +#[serial] async fn test_mini_agent_tcp_handles_requests() { let config = Arc::new(create_tcp_test_config()); let test_port = config.dd_apm_receiver_port; @@ -217,3 +219,125 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Clean up agent_handle.abort(); } + +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_with_real_flushers() { + use common::mock_server::MockServer; + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + // Start mock HTTP server to intercept trace/stats requests + let mock_server = MockServer::start().await; + + // Give mock server a moment to be ready + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create config pointing to mock server + let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); + let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); + + let mut config = create_tcp_test_config(); + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + // Set short flush intervals for faster testing + config.trace_flush_interval = 1; // 1 second + config.stats_flush_interval = 1; // 1 second + + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + // Create mini agent with REAL flushers + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Give server time to start + tokio::time::sleep(Duration::from_millis(100)).await; + + // Send trace data through the mini agent + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint" + ); + + // Wait for the trace flusher to flush (interval is 1 second + buffer) + tokio::time::sleep(Duration::from_millis(1500)).await; + + // Verify the mock server received the trace request + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + // Validate the trace request + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + // Check headers + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + // The real flusher uses application/x-protobuf after coalescing traces + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + // The body should be non-empty protobuf data + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); + + println!("✓ Trace flusher successfully sent data to mock server"); + println!(" - Received {} trace request(s)", trace_reqs.len()); + println!(" - Payload size: {} bytes", trace_req.body.len()); + println!(" - Headers: {} present", trace_req.headers.len()); + + // Clean up + agent_handle.abort(); +} From 08625819d032391fc946e62f4b3fd6932cf37140 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 9 Jan 2026 16:41:04 -0500 Subject: [PATCH 2/6] Add windows full integration test for datadog-trace-agent --- .../tests/integration_test.rs | 165 +++++++++++++++++- 1 file changed, 159 insertions(+), 6 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index b842a02..de5c932 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -21,10 +21,10 @@ use std::time::Duration; use common::helpers::send_named_pipe_request; /// Create a test config with TCP transport -pub fn create_tcp_test_config() -> Config { +pub fn create_tcp_test_config(port: u16) -> Config { Config { dd_site: "mock-datadoghq.com".to_string(), - dd_apm_receiver_port: 8126, + dd_apm_receiver_port: port, dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, dd_dogstatsd_windows_pipe_name: None, @@ -52,7 +52,7 @@ pub fn create_tcp_test_config() -> Config { #[tokio::test] #[serial] async fn test_mini_agent_tcp_handles_requests() { - let config = Arc::new(create_tcp_test_config()); + let config = Arc::new(create_tcp_test_config(8126)); let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { config: config.clone(), @@ -240,7 +240,7 @@ async fn test_mini_agent_with_real_flushers() { let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(8127); config.trace_intake = libdd_common::Endpoint { url: trace_url.parse().unwrap(), api_key: Some("test-api-key".into()), @@ -277,8 +277,21 @@ async fn test_mini_agent_with_real_flushers() { let _ = mini_agent.start_mini_agent().await; }); - // Give server time to start - tokio::time::sleep(Duration::from_millis(100)).await; + // Wait for server to be ready by polling /info endpoint + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); // Send trace data through the mini agent let trace_payload = create_test_trace_payload(); @@ -341,3 +354,143 @@ async fn test_mini_agent_with_real_flushers() { // Clean up agent_handle.abort(); } + +#[cfg(all(test, windows))] +#[tokio::test] +#[serial] +async fn test_mini_agent_named_pipe_with_real_flushers() { + use common::mock_server::MockServer; + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + // Start mock HTTP server to intercept trace/stats requests + let mock_server = MockServer::start().await; + + // Give mock server a moment to be ready + tokio::time::sleep(Duration::from_millis(50)).await; + + // Create config pointing to mock server + let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); + let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); + + let mut config = create_tcp_test_config(0); + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + // Set short flush intervals for faster testing + config.trace_flush_interval = 1; // 1 second + config.stats_flush_interval = 1; // 1 second + + // Configure for named pipe + let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_receiver_port = 0; + + let config = Arc::new(config); + + // Create mini agent with REAL flushers + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + let mini_agent = MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + }; + + // Start the mini agent + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready by polling /info endpoint + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent named pipe server failed to start within timeout" + ); + + // Send trace data through the mini agent via named pipe + let trace_payload = create_test_trace_payload(); + let trace_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + + assert_eq!( + trace_response.status(), + StatusCode::OK, + "Expected 200 OK from /v0.4/traces endpoint over named pipe" + ); + + // Wait for the trace flusher to flush (interval is 1 second + buffer) + tokio::time::sleep(Duration::from_millis(1500)).await; + + // Verify the mock server received the trace request + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + // Validate the trace request + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + // Check headers + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + // The real flusher uses application/x-protobuf after coalescing traces + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + // The body should be non-empty protobuf data + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); + + println!("✓ [Named Pipe] Trace flusher successfully sent data to mock server"); + println!(" - Received {} trace request(s)", trace_reqs.len()); + println!(" - Payload size: {} bytes", trace_req.body.len()); + println!(" - Headers: {} present", trace_req.headers.len()); + + // Clean up + agent_handle.abort(); +} From e680a1b88dc28e207da81619fd6802add5e08c48 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 30 Jan 2026 17:37:18 -0500 Subject: [PATCH 3/6] Extract test helpers --- .../tests/integration_test.rs | 278 ++++++------------ 1 file changed, 93 insertions(+), 185 deletions(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index de5c932..66ed7d7 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -20,6 +20,84 @@ use std::time::Duration; #[cfg(windows)] use common::helpers::send_named_pipe_request; +/// Helper to configure a config with mock server endpoints +pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { + let trace_url = format!("{}/api/v0.2/traces", mock_server_url); + let stats_url = format!("{}/api/v0.6/stats", mock_server_url); + + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_flush_interval_secs = 1; + config.stats_flush_interval_secs = 1; +} + +/// Helper to create a mini agent with real flushers +pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), + } +} + +/// Helper to verify trace request sent to mock server +pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { + + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); +} + /// Create a test config with TCP transport pub fn create_tcp_test_config(port: u16) -> Config { Config { @@ -225,59 +303,22 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[serial] async fn test_mini_agent_with_real_flushers() { use common::mock_server::MockServer; - use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, - stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, - }; - // Start mock HTTP server to intercept trace/stats requests let mock_server = MockServer::start().await; - - // Give mock server a moment to be ready tokio::time::sleep(Duration::from_millis(50)).await; - // Create config pointing to mock server - let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); - let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - let mut config = create_tcp_test_config(8127); - config.trace_intake = libdd_common::Endpoint { - url: trace_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - config.trace_stats_intake = libdd_common::Endpoint { - url: stats_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - // Set short flush intervals for faster testing - config.trace_flush_interval = 1; // 1 second - config.stats_flush_interval = 1; // 1 second - + configure_mock_endpoints(&mut config, &mock_server.url()); let config = Arc::new(config); let test_port = config.dd_apm_receiver_port; - // Create mini agent with REAL flushers - let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), - trace_flusher: Arc::new(ServerlessTraceFlusher::new( - aggregator.clone(), - config.clone(), - )), - stats_processor: Arc::new(ServerlessStatsProcessor {}), - stats_flusher: Arc::new(ServerlessStatsFlusher {}), - env_verifier: Arc::new(MockEnvVerifier), - }; + let mini_agent = create_mini_agent_with_real_flushers(config); - // Start the mini agent let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; }); - // Wait for server to be ready by polling /info endpoint + // Wait for server to be ready let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; @@ -293,65 +334,18 @@ async fn test_mini_agent_with_real_flushers() { "Mini agent server failed to start within timeout" ); - // Send trace data through the mini agent + // Send trace data let trace_payload = create_test_trace_payload(); let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request"); + assert_eq!(trace_response.status(), StatusCode::OK); - assert_eq!( - trace_response.status(), - StatusCode::OK, - "Expected 200 OK from /v0.4/traces endpoint" - ); - - // Wait for the trace flusher to flush (interval is 1 second + buffer) + // Wait for flush tokio::time::sleep(Duration::from_millis(1500)).await; - // Verify the mock server received the trace request - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + verify_trace_request(&mock_server); - assert!( - !trace_reqs.is_empty(), - "Expected at least one trace request to mock server" - ); - - // Validate the trace request - let trace_req = &trace_reqs[0]; - assert_eq!(trace_req.method, "POST", "Expected POST method"); - - // Check headers - let content_type = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "content-type") - .map(|(_, v)| v.as_str()); - // The real flusher uses application/x-protobuf after coalescing traces - assert_eq!( - content_type, - Some("application/x-protobuf"), - "Expected protobuf content-type" - ); - - let api_key = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "dd-api-key") - .map(|(_, v)| v.as_str()); - assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); - - // The body should be non-empty protobuf data - assert!( - !trace_req.body.is_empty(), - "Expected non-empty trace payload" - ); - - println!("✓ Trace flusher successfully sent data to mock server"); - println!(" - Received {} trace request(s)", trace_reqs.len()); - println!(" - Payload size: {} bytes", trace_req.body.len()); - println!(" - Headers: {} present", trace_req.headers.len()); - - // Clean up agent_handle.abort(); } @@ -360,63 +354,24 @@ async fn test_mini_agent_with_real_flushers() { #[serial] async fn test_mini_agent_named_pipe_with_real_flushers() { use common::mock_server::MockServer; - use datadog_trace_agent::{ - aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, - stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, - }; - // Start mock HTTP server to intercept trace/stats requests let mock_server = MockServer::start().await; - - // Give mock server a moment to be ready tokio::time::sleep(Duration::from_millis(50)).await; - // Create config pointing to mock server - let trace_url = format!("{}/api/v0.2/traces", mock_server.url()); - let stats_url = format!("{}/api/v0.6/stats", mock_server.url()); - - let mut config = create_tcp_test_config(0); - config.trace_intake = libdd_common::Endpoint { - url: trace_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - config.trace_stats_intake = libdd_common::Endpoint { - url: stats_url.parse().unwrap(), - api_key: Some("test-api-key".into()), - ..Default::default() - }; - // Set short flush intervals for faster testing - config.trace_flush_interval = 1; // 1 second - config.stats_flush_interval = 1; // 1 second - - // Configure for named pipe let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + let mut config = create_tcp_test_config(0); + configure_mock_endpoints(&mut config, &mock_server.url()); config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); config.dd_apm_receiver_port = 0; - let config = Arc::new(config); - // Create mini agent with REAL flushers - let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); - let mini_agent = MiniAgent { - config: config.clone(), - trace_processor: Arc::new(ServerlessTraceProcessor {}), - trace_flusher: Arc::new(ServerlessTraceFlusher::new( - aggregator.clone(), - config.clone(), - )), - stats_processor: Arc::new(ServerlessStatsProcessor {}), - stats_flusher: Arc::new(ServerlessStatsFlusher {}), - env_verifier: Arc::new(MockEnvVerifier), - }; + let mini_agent = create_mini_agent_with_real_flushers(config); - // Start the mini agent let agent_handle = tokio::spawn(async move { let _ = mini_agent.start_mini_agent().await; }); - // Wait for server to be ready by polling /info endpoint + // Wait for server to be ready let mut server_ready = false; for _ in 0..20 { tokio::time::sleep(Duration::from_millis(50)).await; @@ -432,65 +387,18 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { "Mini agent named pipe server failed to start within timeout" ); - // Send trace data through the mini agent via named pipe + // Send trace data via named pipe let trace_payload = create_test_trace_payload(); let trace_response = send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) .await .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!(trace_response.status(), StatusCode::OK); - assert_eq!( - trace_response.status(), - StatusCode::OK, - "Expected 200 OK from /v0.4/traces endpoint over named pipe" - ); - - // Wait for the trace flusher to flush (interval is 1 second + buffer) + // Wait for flush tokio::time::sleep(Duration::from_millis(1500)).await; - // Verify the mock server received the trace request - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); - - assert!( - !trace_reqs.is_empty(), - "Expected at least one trace request to mock server" - ); - - // Validate the trace request - let trace_req = &trace_reqs[0]; - assert_eq!(trace_req.method, "POST", "Expected POST method"); - - // Check headers - let content_type = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "content-type") - .map(|(_, v)| v.as_str()); - // The real flusher uses application/x-protobuf after coalescing traces - assert_eq!( - content_type, - Some("application/x-protobuf"), - "Expected protobuf content-type" - ); - - let api_key = trace_req - .headers - .iter() - .find(|(k, _)| k.to_lowercase() == "dd-api-key") - .map(|(_, v)| v.as_str()); - assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + verify_trace_request(&mock_server); - // The body should be non-empty protobuf data - assert!( - !trace_req.body.is_empty(), - "Expected non-empty trace payload" - ); - - println!("✓ [Named Pipe] Trace flusher successfully sent data to mock server"); - println!(" - Received {} trace request(s)", trace_reqs.len()); - println!(" - Payload size: {} bytes", trace_req.body.len()); - println!(" - Headers: {} present", trace_req.headers.len()); - - // Clean up agent_handle.abort(); } From 158363f5b4664f250fce4626795fa203f9c50464 Mon Sep 17 00:00:00 2001 From: Lewis Date: Fri, 30 Jan 2026 17:45:01 -0500 Subject: [PATCH 4/6] Improve test quality --- .../tests/common/mock_server.rs | 121 ++++++++++-------- .../tests/integration_test.rs | 14 +- 2 files changed, 77 insertions(+), 58 deletions(-) diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs index c1b00bd..5e0b485 100644 --- a/crates/datadog-trace-agent/tests/common/mock_server.rs +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -19,10 +19,10 @@ pub struct ReceivedRequest { pub body: Vec, } -#[derive(Clone)] pub struct MockServer { pub addr: SocketAddr, pub received_requests: Arc>>, + shutdown_tx: Option>, } impl MockServer { @@ -36,65 +36,78 @@ impl MockServer { let received_requests = Arc::new(Mutex::new(Vec::new())); let requests_clone = received_requests.clone(); + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { loop { - let (stream, _) = match listener.accept().await { - Ok(conn) => conn, - Err(_) => break, - }; - - let io = TokioIo::new(stream); - let requests = requests_clone.clone(); - - tokio::spawn(async move { - let service = hyper::service::service_fn(move |req: Request| { - let requests = requests.clone(); - async move { - // Capture the request - let method = req.method().to_string(); - let path = req.uri().path().to_string(); - let headers: Vec<(String, String)> = req - .headers() - .iter() - .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) - .collect(); - - // Read the body - let body_bytes = req - .into_body() - .collect() - .await - .map(|collected| collected.to_bytes().to_vec()) - .unwrap_or_default(); - - // Store the request - requests.lock().unwrap().push(ReceivedRequest { - method, - path, - headers, - body: body_bytes, + tokio::select! { + result = listener.accept() => { + let (stream, _) = match result { + Ok(conn) => conn, + Err(e) => { + eprintln!("Mock server accept error: {}", e); + break; + } + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } }); - // Return 200 OK - Ok::<_, hyper::http::Error>( - Response::builder() - .status(200) - .body(hyper_migration::Body::from(r#"{"ok":true}"#)) - .unwrap(), - ) - } - }); - - let _ = hyper::server::conn::http1::Builder::new() - .serve_connection(io, service) - .await; - }); + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + _ = &mut shutdown_rx => { + break; + } + } } }); MockServer { addr, received_requests, + shutdown_tx: Some(shutdown_tx), } } @@ -126,3 +139,11 @@ impl MockServer { self.received_requests.lock().unwrap().clear(); } } + +impl Drop for MockServer { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 66ed7d7..efc8e28 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -4,6 +4,7 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, @@ -20,6 +21,8 @@ use std::time::Duration; #[cfg(windows)] use common::helpers::send_named_pipe_request; +const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); + /// Helper to configure a config with mock server endpoints pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { let trace_url = format!("{}/api/v0.2/traces", mock_server_url); @@ -63,7 +66,6 @@ pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { /// Helper to verify trace request sent to mock server pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { - let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); assert!( @@ -301,9 +303,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[cfg(test)] #[tokio::test] #[serial] -async fn test_mini_agent_with_real_flushers() { - use common::mock_server::MockServer; - +async fn test_mini_agent_tcp_with_real_flushers() { let mock_server = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; @@ -342,7 +342,7 @@ async fn test_mini_agent_with_real_flushers() { assert_eq!(trace_response.status(), StatusCode::OK); // Wait for flush - tokio::time::sleep(Duration::from_millis(1500)).await; + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); @@ -353,8 +353,6 @@ async fn test_mini_agent_with_real_flushers() { #[tokio::test] #[serial] async fn test_mini_agent_named_pipe_with_real_flushers() { - use common::mock_server::MockServer; - let mock_server = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; @@ -396,7 +394,7 @@ async fn test_mini_agent_named_pipe_with_real_flushers() { assert_eq!(trace_response.status(), StatusCode::OK); // Wait for flush - tokio::time::sleep(Duration::from_millis(1500)).await; + tokio::time::sleep(FLUSH_WAIT_DURATION).await; verify_trace_request(&mock_server); From 2d12e2fc9ed537ce65802b51feb7c71094d40371 Mon Sep 17 00:00:00 2001 From: Lewis Date: Mon, 2 Feb 2026 12:50:19 -0500 Subject: [PATCH 5/6] Add type --- crates/datadog-trace-agent/tests/integration_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index efc8e28..dc052dc 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -304,7 +304,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { #[tokio::test] #[serial] async fn test_mini_agent_tcp_with_real_flushers() { - let mock_server = MockServer::start().await; + let mock_server: MockServer = MockServer::start().await; tokio::time::sleep(Duration::from_millis(50)).await; let mut config = create_tcp_test_config(8127); From 2282e8c1484e7b4942aaa0ebc0562e7fa143d271 Mon Sep 17 00:00:00 2001 From: Lewis Date: Tue, 3 Feb 2026 09:19:00 -0500 Subject: [PATCH 6/6] Fix windows test --- crates/datadog-trace-agent/tests/integration_test.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index dc052dc..b2de4c2 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -226,7 +226,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior let pipe_name = "dd_trace_integration_test"; let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(0); config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); config.dd_apm_receiver_port = 0; let config = Arc::new(config);