Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions crates/datadog-trace-agent/tests/common/mock_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Simple mock HTTP server for testing flushers

use http_body_util::BodyExt;
use hyper::{body::Incoming, Request, Response};
use hyper_util::rt::TokioIo;
use libdd_common::hyper_migration;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;

#[derive(Clone, Debug)]
pub struct ReceivedRequest {
pub method: String,
pub path: String,
pub headers: Vec<(String, String)>,
pub body: Vec<u8>,
}

pub struct MockServer {
pub addr: SocketAddr,
pub received_requests: Arc<Mutex<Vec<ReceivedRequest>>>,
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}

impl MockServer {
/// Start a mock HTTP server on a random port
pub async fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind mock server");
let addr = listener.local_addr().expect("Failed to get local addr");

let received_requests = Arc::new(Mutex::new(Vec::new()));
let requests_clone = received_requests.clone();

let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
loop {
tokio::select! {
result = listener.accept() => {
let (stream, _) = match result {
Ok(conn) => conn,
Err(e) => {
eprintln!("Mock server accept error: {}", e);
break;
}
};

let io = TokioIo::new(stream);
let requests = requests_clone.clone();

tokio::spawn(async move {
let service = hyper::service::service_fn(move |req: Request<Incoming>| {
let requests = requests.clone();
async move {
// Capture the request
let method = req.method().to_string();
let path = req.uri().path().to_string();
let headers: Vec<(String, String)> = req
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();

// Read the body
let body_bytes = req
.into_body()
.collect()
.await
.map(|collected| collected.to_bytes().to_vec())
.unwrap_or_default();

// Store the request
requests.lock().unwrap().push(ReceivedRequest {
method,
path,
headers,
body: body_bytes,
});

// Return 200 OK
Ok::<_, hyper::http::Error>(
Response::builder()
.status(200)
.body(hyper_migration::Body::from(r#"{"ok":true}"#))
.unwrap(),
)
}
});

let _ = hyper::server::conn::http1::Builder::new()
.serve_connection(io, service)
.await;
});
}
_ = &mut shutdown_rx => {
break;
}
}
}
});

MockServer {
addr,
received_requests,
shutdown_tx: Some(shutdown_tx),
}
}

/// Get the base URL of the mock server
pub fn url(&self) -> String {
format!("http://{}", self.addr)
}

/// Get all received requests
#[allow(dead_code)]
pub fn get_requests(&self) -> Vec<ReceivedRequest> {
self.received_requests.lock().unwrap().clone()
}

/// Get requests matching a path
pub fn get_requests_for_path(&self, path: &str) -> Vec<ReceivedRequest> {
self.received_requests
.lock()
.unwrap()
.iter()
.filter(|req| req.path == path)
.cloned()
.collect()
}

/// Clear all received requests
#[allow(dead_code)]
pub fn clear_requests(&self) {
self.received_requests.lock().unwrap().clear();
}
}

impl Drop for MockServer {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
1 change: 1 addition & 0 deletions crates/datadog-trace-agent/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
//! Common test utilities, mocks, and helpers for integration tests

pub mod helpers;
pub mod mock_server;
pub mod mocks;
193 changes: 188 additions & 5 deletions crates/datadog-trace-agent/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,107 @@
mod common;

use common::helpers::{create_test_trace_payload, send_tcp_request};
use common::mock_server::MockServer;
use common::mocks::{MockEnvVerifier, MockStatsFlusher, MockStatsProcessor, MockTraceFlusher};
use datadog_trace_agent::{
config::Config, mini_agent::MiniAgent, proxy_flusher::ProxyFlusher,
trace_processor::ServerlessTraceProcessor,
trace_flusher::TraceFlusher, trace_processor::ServerlessTraceProcessor,
};
use http_body_util::BodyExt;
use hyper::StatusCode;
use libdd_trace_utils::trace_utils;
use serde_json::Value;
use serial_test::serial;
use std::sync::Arc;
use std::time::Duration;

#[cfg(windows)]
use common::helpers::send_named_pipe_request;

const FLUSH_WAIT_DURATION: Duration = Duration::from_millis(1500);

/// Helper to configure a config with mock server endpoints
pub fn configure_mock_endpoints(config: &mut Config, mock_server_url: &str) {
let trace_url = format!("{}/api/v0.2/traces", mock_server_url);
let stats_url = format!("{}/api/v0.6/stats", mock_server_url);

config.trace_intake = libdd_common::Endpoint {
url: trace_url.parse().unwrap(),
api_key: Some("test-api-key".into()),
..Default::default()
};
config.trace_stats_intake = libdd_common::Endpoint {
url: stats_url.parse().unwrap(),
api_key: Some("test-api-key".into()),
..Default::default()
};
config.trace_flush_interval_secs = 1;
config.stats_flush_interval_secs = 1;
}

/// Helper to create a mini agent with real flushers
pub fn create_mini_agent_with_real_flushers(config: Arc<Config>) -> MiniAgent {
use datadog_trace_agent::{
aggregator::TraceAggregator, stats_flusher::ServerlessStatsFlusher,
stats_processor::ServerlessStatsProcessor, trace_flusher::ServerlessTraceFlusher,
};

let aggregator = Arc::new(tokio::sync::Mutex::new(TraceAggregator::default()));
MiniAgent {
config: config.clone(),
trace_processor: Arc::new(ServerlessTraceProcessor {}),
trace_flusher: Arc::new(ServerlessTraceFlusher::new(
aggregator.clone(),
config.clone(),
)),
stats_processor: Arc::new(ServerlessStatsProcessor {}),
stats_flusher: Arc::new(ServerlessStatsFlusher {}),
env_verifier: Arc::new(MockEnvVerifier),
proxy_flusher: Arc::new(ProxyFlusher::new(config.clone())),
}
}

/// Helper to verify trace request sent to mock server
pub fn verify_trace_request(mock_server: &common::mock_server::MockServer) {
let trace_reqs = mock_server.get_requests_for_path("/api/v0.2/traces");

assert!(
!trace_reqs.is_empty(),
"Expected at least one trace request to mock server"
);

let trace_req = &trace_reqs[0];
assert_eq!(trace_req.method, "POST", "Expected POST method");

let content_type = trace_req
.headers
.iter()
.find(|(k, _)| k.to_lowercase() == "content-type")
.map(|(_, v)| v.as_str());
assert_eq!(
content_type,
Some("application/x-protobuf"),
"Expected protobuf content-type"
);

let api_key = trace_req
.headers
.iter()
.find(|(k, _)| k.to_lowercase() == "dd-api-key")
.map(|(_, v)| v.as_str());
assert_eq!(api_key, Some("test-api-key"), "Expected API key header");

assert!(
!trace_req.body.is_empty(),
"Expected non-empty trace payload"
);
}

/// Create a test config with TCP transport
pub fn create_tcp_test_config() -> Config {
pub fn create_tcp_test_config(port: u16) -> Config {
Config {
dd_site: "mock-datadoghq.com".to_string(),
dd_apm_receiver_port: 8126,
dd_apm_receiver_port: port,
dd_apm_windows_pipe_name: None,
dd_dogstatsd_port: 8125,
dd_dogstatsd_windows_pipe_name: None,
Expand All @@ -49,8 +130,9 @@ pub fn create_tcp_test_config() -> Config {

#[cfg(test)]
#[tokio::test]
#[serial]
async fn test_mini_agent_tcp_handles_requests() {
let config = Arc::new(create_tcp_test_config());
let config = Arc::new(create_tcp_test_config(8126));
let test_port = config.dd_apm_receiver_port;
let mini_agent = MiniAgent {
config: config.clone(),
Expand Down Expand Up @@ -144,7 +226,7 @@ async fn test_mini_agent_named_pipe_handles_requests() {
// Use just the pipe name without \\.\pipe\ prefix, matching datadog-agent behavior
let pipe_name = "dd_trace_integration_test";
let pipe_path = format!(r"\\.\pipe\{}", pipe_name); // Full path for client connections
let mut config = create_tcp_test_config();
let mut config = create_tcp_test_config(0);
config.dd_apm_windows_pipe_name = Some(pipe_path.clone());
config.dd_apm_receiver_port = 0;
let config = Arc::new(config);
Expand Down Expand Up @@ -217,3 +299,104 @@ async fn test_mini_agent_named_pipe_handles_requests() {
// Clean up
agent_handle.abort();
}

#[cfg(test)]
#[tokio::test]
#[serial]
async fn test_mini_agent_tcp_with_real_flushers() {
let mock_server: MockServer = MockServer::start().await;
tokio::time::sleep(Duration::from_millis(50)).await;

let mut config = create_tcp_test_config(8127);
configure_mock_endpoints(&mut config, &mock_server.url());
let config = Arc::new(config);
let test_port = config.dd_apm_receiver_port;

let mini_agent = create_mini_agent_with_real_flushers(config);

let agent_handle = tokio::spawn(async move {
let _ = mini_agent.start_mini_agent().await;
});

// Wait for server to be ready
let mut server_ready = false;
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(50)).await;
if let Ok(response) = send_tcp_request(test_port, "/info", "GET", None).await {
if response.status().is_success() {
server_ready = true;
break;
}
}
}
assert!(
server_ready,
"Mini agent server failed to start within timeout"
);

// Send trace data
let trace_payload = create_test_trace_payload();
let trace_response = send_tcp_request(test_port, "/v0.4/traces", "POST", Some(trace_payload))
.await
.expect("Failed to send /v0.4/traces request");
assert_eq!(trace_response.status(), StatusCode::OK);

// Wait for flush
tokio::time::sleep(FLUSH_WAIT_DURATION).await;

verify_trace_request(&mock_server);

agent_handle.abort();
}

#[cfg(all(test, windows))]
#[tokio::test]
#[serial]
async fn test_mini_agent_named_pipe_with_real_flushers() {
let mock_server = MockServer::start().await;
tokio::time::sleep(Duration::from_millis(50)).await;

let pipe_name = r"\\.\pipe\dd_trace_real_flusher_test";
let mut config = create_tcp_test_config(0);
configure_mock_endpoints(&mut config, &mock_server.url());
config.dd_apm_windows_pipe_name = Some(pipe_name.to_string());
config.dd_apm_receiver_port = 0;
let config = Arc::new(config);

let mini_agent = create_mini_agent_with_real_flushers(config);

let agent_handle = tokio::spawn(async move {
let _ = mini_agent.start_mini_agent().await;
});

// Wait for server to be ready
let mut server_ready = false;
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(50)).await;
if let Ok(response) = send_named_pipe_request(pipe_name, "/info", "GET", None).await {
if response.status().is_success() {
server_ready = true;
break;
}
}
}
assert!(
server_ready,
"Mini agent named pipe server failed to start within timeout"
);

// Send trace data via named pipe
let trace_payload = create_test_trace_payload();
let trace_response =
send_named_pipe_request(pipe_name, "/v0.4/traces", "POST", Some(trace_payload))
.await
.expect("Failed to send /v0.4/traces request over named pipe");
assert_eq!(trace_response.status(), StatusCode::OK);

// Wait for flush
tokio::time::sleep(FLUSH_WAIT_DURATION).await;

verify_trace_request(&mock_server);

agent_handle.abort();
}
Loading