diff --git a/crates/datadog-trace-agent/tests/common/mock_server.rs b/crates/datadog-trace-agent/tests/common/mock_server.rs new file mode 100644 index 0000000..5e0b485 --- /dev/null +++ b/crates/datadog-trace-agent/tests/common/mock_server.rs @@ -0,0 +1,149 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Simple mock HTTP server for testing flushers + +use http_body_util::BodyExt; +use hyper::{body::Incoming, Request, Response}; +use hyper_util::rt::TokioIo; +use libdd_common::hyper_migration; +use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; +use tokio::net::TcpListener; + +#[derive(Clone, Debug)] +pub struct ReceivedRequest { + pub method: String, + pub path: String, + pub headers: Vec<(String, String)>, + pub body: Vec, +} + +pub struct MockServer { + pub addr: SocketAddr, + pub received_requests: Arc>>, + shutdown_tx: Option>, +} + +impl MockServer { + /// Start a mock HTTP server on a random port + pub async fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind mock server"); + let addr = listener.local_addr().expect("Failed to get local addr"); + + let received_requests = Arc::new(Mutex::new(Vec::new())); + let requests_clone = received_requests.clone(); + + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + result = listener.accept() => { + let (stream, _) = match result { + Ok(conn) => conn, + Err(e) => { + eprintln!("Mock server accept error: {}", e); + break; + } + }; + + let io = TokioIo::new(stream); + let requests = requests_clone.clone(); + + tokio::spawn(async move { + let service = hyper::service::service_fn(move |req: Request| { + let requests = requests.clone(); + async move { + // Capture the request + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let headers: Vec<(String, String)> = req + .headers() + .iter() + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) + .collect(); + + // Read the body + let body_bytes = req + .into_body() + .collect() + .await + .map(|collected| collected.to_bytes().to_vec()) + .unwrap_or_default(); + + // Store the request + requests.lock().unwrap().push(ReceivedRequest { + method, + path, + headers, + body: body_bytes, + }); + + // Return 200 OK + Ok::<_, hyper::http::Error>( + Response::builder() + .status(200) + .body(hyper_migration::Body::from(r#"{"ok":true}"#)) + .unwrap(), + ) + } + }); + + let _ = hyper::server::conn::http1::Builder::new() + .serve_connection(io, service) + .await; + }); + } + _ = &mut shutdown_rx => { + break; + } + } + } + }); + + MockServer { + addr, + received_requests, + shutdown_tx: Some(shutdown_tx), + } + } + + /// Get the base URL of the mock server + pub fn url(&self) -> String { + format!("http://{}", self.addr) + } + + /// Get all received requests + #[allow(dead_code)] + pub fn get_requests(&self) -> Vec { + self.received_requests.lock().unwrap().clone() + } + + /// Get requests matching a path + pub fn get_requests_for_path(&self, path: &str) -> Vec { + self.received_requests + .lock() + .unwrap() + .iter() + .filter(|req| req.path == path) + .cloned() + .collect() + } + + /// Clear all received requests + #[allow(dead_code)] + pub fn clear_requests(&self) { + self.received_requests.lock().unwrap().clear(); + } +} + +impl Drop for MockServer { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} diff --git a/crates/datadog-trace-agent/tests/common/mod.rs b/crates/datadog-trace-agent/tests/common/mod.rs index 3d83bbd..447946a 100644 --- a/crates/datadog-trace-agent/tests/common/mod.rs +++ b/crates/datadog-trace-agent/tests/common/mod.rs @@ -4,4 +4,5 @@ //! Common test utilities, mocks, and helpers for integration tests pub mod helpers; +pub mod mock_server; pub mod mocks; diff --git a/crates/datadog-trace-agent/tests/integration_test.rs b/crates/datadog-trace-agent/tests/integration_test.rs index 62d632e..b2de4c2 100644 --- a/crates/datadog-trace-agent/tests/integration_test.rs +++ b/crates/datadog-trace-agent/tests/integration_test.rs @@ -4,26 +4,107 @@ mod common; use common::helpers::{create_test_trace_payload, send_tcp_request}; +use common::mock_server::MockServer; use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher}; use datadog_trace_agent::{ config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher, - trace_processor::ServerlessTraceProcessor, + trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor, }; use http_body_util::BodyExt; use hyper::StatusCode; use libdd_trace_utils::trace_utils; use serde_json::Value; +use serial_test::serial; use std::sync::Arc; use std::time::Duration; #[cfg(windows)] use common::helpers::send_named_pipe_request; +const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500); + +/// Helper to configure a config with mock server endpoints +pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) { + let trace_url = format!("{}/api/v0.2/traces", mock_server_url); + let stats_url = format!("{}/api/v0.6/stats", mock_server_url); + + config.trace_intake = libdd_common::Endpoint { + url: trace_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_stats_intake = libdd_common::Endpoint { + url: stats_url.parse().unwrap(), + api_key: Some("test-api-key".into()), + ..Default::default() + }; + config.trace_flush_interval_secs = 1; + config.stats_flush_interval_secs = 1; +} + +/// Helper to create a mini agent with real flushers +pub fn create_mini_agent_with_real_flushers(config: Arc) -> MiniAgent { + use datadog_trace_agent::{ + aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher, + stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher, + }; + + let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default())); + MiniAgent { + config: config.clone(), + trace_processor: Arc::new(ServerlessTraceProcessor {}), + trace_flusher: Arc::new(ServerlessTraceFlusher::new( + aggregator.clone(), + config.clone(), + )), + stats_processor: Arc::new(ServerlessStatsProcessor {}), + stats_flusher: Arc::new(ServerlessStatsFlusher {}), + env_verifier: Arc::new(MockEnvVerifier), + proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())), + } +} + +/// Helper to verify trace request sent to mock server +pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) { + let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces"); + + assert!( + !trace_reqs.is_empty(), + "Expected at least one trace request to mock server" + ); + + let trace_req = &trace_reqs[0]; + assert_eq!(trace_req.method, "POST", "Expected POST method"); + + let content_type = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "content-type") + .map(|(_, v)| v.as_str()); + assert_eq!( + content_type, + Some("application/x-protobuf"), + "Expected protobuf content-type" + ); + + let api_key = trace_req + .headers + .iter() + .find(|(k, _)| k.to_lowercase() == "dd-api-key") + .map(|(_, v)| v.as_str()); + assert_eq!(api_key, Some("test-api-key"), "Expected API key header"); + + assert!( + !trace_req.body.is_empty(), + "Expected non-empty trace payload" + ); +} + /// Create a test config with TCP transport -pub fn create_tcp_test_config() -> Config { +pub fn create_tcp_test_config(port: u16) -> Config { Config { dd_site: "mock-datadoghq.com".to_string(), - dd_apm_receiver_port: 8126, + dd_apm_receiver_port: port, dd_apm_windows_pipe_name: None, dd_dogstatsd_port: 8125, dd_dogstatsd_windows_pipe_name: None, @@ -49,8 +130,9 @@ pub fn create_tcp_test_config() -> Config { #[cfg(test)] #[tokio::test] +#[serial] async fn test_mini_agent_tcp_handles_requests() { - let config = Arc::new(create_tcp_test_config()); + let config = Arc::new(create_tcp_test_config(8126)); let test_port = config.dd_apm_receiver_port; let mini_agent = MiniAgent { config: config.clone(), @@ -144,7 +226,7 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior let pipe_name = "dd_trace_integration_test"; let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections - let mut config = create_tcp_test_config(); + let mut config = create_tcp_test_config(0); config.dd_apm_windows_pipe_name = Some(pipe_path.clone()); config.dd_apm_receiver_port = 0; let config = Arc::new(config); @@ -217,3 +299,104 @@ async fn test_mini_agent_named_pipe_handles_requests() { // Clean up agent_handle.abort(); } + +#[cfg(test)] +#[tokio::test] +#[serial] +async fn test_mini_agent_tcp_with_real_flushers() { + let mock_server: MockServer = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let mut config = create_tcp_test_config(8127); + configure_mock_endpoints(&mut config, &mock_server.url()); + let config = Arc::new(config); + let test_port = config.dd_apm_receiver_port; + + let mini_agent = create_mini_agent_with_real_flushers(config); + + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent server failed to start within timeout" + ); + + // Send trace data + let trace_payload = create_test_trace_payload(); + let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request"); + assert_eq!(trace_response.status(), StatusCode::OK); + + // Wait for flush + tokio::time::sleep(FLUSH_WAIT_DURATION).await; + + verify_trace_request(&mock_server); + + agent_handle.abort(); +} + +#[cfg(all(test, windows))] +#[tokio::test] +#[serial] +async fn test_mini_agent_named_pipe_with_real_flushers() { + let mock_server = MockServer::start().await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test"; + let mut config = create_tcp_test_config(0); + configure_mock_endpoints(&mut config, &mock_server.url()); + config.dd_apm_windows_pipe_name = Some(pipe_name.to_string()); + config.dd_apm_receiver_port = 0; + let config = Arc::new(config); + + let mini_agent = create_mini_agent_with_real_flushers(config); + + let agent_handle = tokio::spawn(async move { + let _ = mini_agent.start_mini_agent().await; + }); + + // Wait for server to be ready + let mut server_ready = false; + for _ in 0..20 { + tokio::time::sleep(Duration::from_millis(50)).await; + if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await { + if response.status().is_success() { + server_ready = true; + break; + } + } + } + assert!( + server_ready, + "Mini agent named pipe server failed to start within timeout" + ); + + // Send trace data via named pipe + let trace_payload = create_test_trace_payload(); + let trace_response = + send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload)) + .await + .expect("Failed to send /v0.4/traces request over named pipe"); + assert_eq!(trace_response.status(), StatusCode::OK); + + // Wait for flush + tokio::time::sleep(FLUSH_WAIT_DURATION).await; + + verify_trace_request(&mock_server); + + agent_handle.abort(); +}