From 92fb84c2a7db9af3bddb201a2fd27323b603563c Mon Sep 17 00:00:00 2001 From: Astraea Quinn S <249650883+FullyTyped@users.noreply.github.com> Date: Tue, 13 Jan 2026 14:04:01 +0000 Subject: [PATCH 1/5] test: add request-ID isolation in concurrent exec Adds test to ensure request IDs are properly isolated in concurrent Lambda execution with structured logging. Verifies each concurrent invocation maintains unique request ID context. --- lambda-runtime/Cargo.toml | 1 + lambda-runtime/src/runtime.rs | 161 ++++++++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+) diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index e829a72a..dfa227cb 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -73,6 +73,7 @@ idna_adapter = "=1.2.0" lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] } pin-project-lite = { workspace = true } tracing-appender = "0.2" +tracing-subscriber = { version = "0.3", features = ["registry"] } [package.metadata.docs.rs] all-features = true diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 1175b023..c14766c6 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -908,4 +908,165 @@ mod endpoint_tests { server_handle.abort(); Ok(()) } + + #[tokio::test] + #[cfg(feature = "experimental-concurrency")] + async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> { + use std::collections::{HashMap, HashSet}; + use std::sync::Mutex; + use tracing::{info, subscriber::set_global_default}; + use tracing_subscriber::{layer::SubscriberExt, Layer}; + + #[derive(Clone)] + struct LogCapture { + logs: Arc>>>, + } + + impl LogCapture { + fn new() -> Self { + Self { + logs: Arc::new(Mutex::new(Vec::new())), + } + } + } + + impl Layer for LogCapture + where + S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, + { + fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + let mut fields = HashMap::new(); + struct FieldVisitor<'a>(&'a mut HashMap); + impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert(field.name().to_string(), format!("{:?}", value).trim_matches('"').to_string()); + } + } + event.record(&mut FieldVisitor(&mut fields)); + self.logs.lock().unwrap().push(fields); + } + } + + let log_capture = LogCapture::new(); + let subscriber = tracing_subscriber::registry().with(log_capture.clone()); + set_global_default(subscriber).unwrap(); + + let request_count = Arc::new(AtomicUsize::new(0)); + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let base: http::Uri = format!("http://{addr}").parse()?; + + let server_handle = { + let request_count = request_count.clone(); + tokio::spawn(async move { + loop { + let (tcp, _) = match listener.accept().await { + Ok(v) => v, + Err(_) => return, + }; + + let request_count = request_count.clone(); + let service = service_fn(move |req: Request| { + let request_count = request_count.clone(); + async move { + let (parts, body) = req.into_parts(); + if parts.method == Method::POST { + let _ = body.collect().await; + } + + if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next" { + let count = request_count.fetch_add(1, Ordering::SeqCst); + if count < 300 { + let request_id = format!("test-request-{}", count + 1); + let res = Response::builder() + .status(StatusCode::OK) + .header("lambda-runtime-aws-request-id", &request_id) + .header("lambda-runtime-deadline-ms", "9999999999999") + .body(Full::new(Bytes::from_static(b"{}"))) + .unwrap(); + return Ok::<_, Infallible>(res); + } else { + let res = Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Full::new(Bytes::new())) + .unwrap(); + return Ok::<_, Infallible>(res); + } + } + + if parts.method == Method::POST && parts.uri.path().contains("/response") { + let res = Response::builder() + .status(StatusCode::OK) + .body(Full::new(Bytes::new())) + .unwrap(); + return Ok::<_, Infallible>(res); + } + + let res = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Full::new(Bytes::new())) + .unwrap(); + Ok::<_, Infallible>(res) + } + }); + + let io = TokioIo::new(tcp); + tokio::spawn(async move { + let _ = ServerBuilder::new(TokioExecutor::new()).serve_connection(io, service).await; + }); + } + }) + }; + + async fn test_handler(event: crate::LambdaEvent) -> Result<(), Error> { + let request_id = &event.context.request_id; + info!(observed_request_id = request_id); + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) + } + + let handler = crate::service_fn(test_handler); + let client = Arc::new(Client::builder().with_endpoint(base).build()?); + let runtime = Runtime { + client: client.clone(), + config: Arc::new(Config { + function_name: "test_fn".to_string(), + memory: 128, + version: "1".to_string(), + log_stream: "test_stream".to_string(), + log_group: "test_log".to_string(), + }), + service: wrap_handler(handler, client), + concurrency_limit: 3, + }; + + let runtime_handle = tokio::spawn(async move { runtime.run_concurrent().await }); + + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + let count = request_count.load(Ordering::SeqCst); + if count >= 300 { + tokio::time::sleep(Duration::from_millis(500)).await; + break; + } + } + + runtime_handle.abort(); + server_handle.abort(); + + let logs = log_capture.logs.lock().unwrap(); + let relevant_logs: Vec<_> = logs.iter().filter(|l| l.contains_key("observed_request_id")).collect(); + + assert!(relevant_logs.len() >= 300, "Should have at least 300 log entries, got {}", relevant_logs.len()); + + let mut seen_ids = HashSet::new(); + for log in &relevant_logs { + let observed_id = log.get("observed_request_id").unwrap(); + assert!(observed_id.starts_with("test-request-"), "Request ID should match pattern: {}", observed_id); + assert!(seen_ids.insert(observed_id.clone()), "Request ID should be unique: {}", observed_id); + } + + println!("✅ Concurrent structured logging test passed with {} unique request IDs", seen_ids.len()); + Ok(()) + } } From 376a1d3dc07aa867ede12a761d1c59e64b955807 Mon Sep 17 00:00:00 2001 From: Astraea Quinn S <249650883+FullyTyped@users.noreply.github.com> Date: Tue, 13 Jan 2026 14:17:31 +0000 Subject: [PATCH 2/5] fmt --- lambda-runtime/src/runtime.rs | 43 +++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index c14766c6..45543270 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -912,8 +912,10 @@ mod endpoint_tests { #[tokio::test] #[cfg(feature = "experimental-concurrency")] async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> { - use std::collections::{HashMap, HashSet}; - use std::sync::Mutex; + use std::{ + collections::{HashMap, HashSet}, + sync::Mutex, + }; use tracing::{info, subscriber::set_global_default}; use tracing_subscriber::{layer::SubscriberExt, Layer}; @@ -939,7 +941,10 @@ mod endpoint_tests { struct FieldVisitor<'a>(&'a mut HashMap); impl<'a> tracing::field::Visit for FieldVisitor<'a> { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.0.insert(field.name().to_string(), format!("{:?}", value).trim_matches('"').to_string()); + self.0.insert( + field.name().to_string(), + format!("{:?}", value).trim_matches('"').to_string(), + ); } } event.record(&mut FieldVisitor(&mut fields)); @@ -974,7 +979,8 @@ mod endpoint_tests { let _ = body.collect().await; } - if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next" { + if parts.method == Method::GET && parts.uri.path() == "/2018-06-01/runtime/invocation/next" + { let count = request_count.fetch_add(1, Ordering::SeqCst); if count < 300 { let request_id = format!("test-request-{}", count + 1); @@ -1012,7 +1018,9 @@ mod endpoint_tests { let io = TokioIo::new(tcp); tokio::spawn(async move { - let _ = ServerBuilder::new(TokioExecutor::new()).serve_connection(io, service).await; + let _ = ServerBuilder::new(TokioExecutor::new()) + .serve_connection(io, service) + .await; }); } }) @@ -1050,23 +1058,38 @@ mod endpoint_tests { break; } } - + runtime_handle.abort(); server_handle.abort(); let logs = log_capture.logs.lock().unwrap(); let relevant_logs: Vec<_> = logs.iter().filter(|l| l.contains_key("observed_request_id")).collect(); - assert!(relevant_logs.len() >= 300, "Should have at least 300 log entries, got {}", relevant_logs.len()); + assert!( + relevant_logs.len() >= 300, + "Should have at least 300 log entries, got {}", + relevant_logs.len() + ); let mut seen_ids = HashSet::new(); for log in &relevant_logs { let observed_id = log.get("observed_request_id").unwrap(); - assert!(observed_id.starts_with("test-request-"), "Request ID should match pattern: {}", observed_id); - assert!(seen_ids.insert(observed_id.clone()), "Request ID should be unique: {}", observed_id); + assert!( + observed_id.starts_with("test-request-"), + "Request ID should match pattern: {}", + observed_id + ); + assert!( + seen_ids.insert(observed_id.clone()), + "Request ID should be unique: {}", + observed_id + ); } - println!("✅ Concurrent structured logging test passed with {} unique request IDs", seen_ids.len()); + println!( + "✅ Concurrent structured logging test passed with {} unique request IDs", + seen_ids.len() + ); Ok(()) } } From 98120510128814a48db4d9b33470a2f5b2a5cbd1 Mon Sep 17 00:00:00 2001 From: Astraea Quinn S <249650883+FullyTyped@users.noreply.github.com> Date: Tue, 13 Jan 2026 15:24:30 +0000 Subject: [PATCH 3/5] test: verify span isolation in concurrent execution - Add LogCapture layer with span field tracking via on_new_span() - Add TracingLayer to test runtime service stack - Verify span requestId matches logged observed_request_id - Prove true span context isolation under concurrency --- lambda-runtime/src/runtime.rs | 55 +++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 45543270..575c11ab 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -922,12 +922,14 @@ mod endpoint_tests { #[derive(Clone)] struct LogCapture { logs: Arc>>>, + span_fields: Arc>>>, } impl LogCapture { fn new() -> Self { Self { logs: Arc::new(Mutex::new(Vec::new())), + span_fields: Arc::new(Mutex::new(HashMap::new())), } } } @@ -936,7 +938,29 @@ mod endpoint_tests { where S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, { - fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) { + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::Id, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + if attrs.metadata().name() == "Lambda runtime invoke" { + let mut fields = HashMap::new(); + struct FieldVisitor<'a>(&'a mut HashMap); + impl<'a> tracing::field::Visit for FieldVisitor<'a> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.0.insert( + field.name().to_string(), + format!("{:?}", value).trim_matches('"').to_string(), + ); + } + } + attrs.record(&mut FieldVisitor(&mut fields)); + self.span_fields.lock().unwrap().insert(id.clone(), fields); + } + } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { let mut fields = HashMap::new(); struct FieldVisitor<'a>(&'a mut HashMap); impl<'a> tracing::field::Visit for FieldVisitor<'a> { @@ -948,6 +972,16 @@ mod endpoint_tests { } } event.record(&mut FieldVisitor(&mut fields)); + + // Add span requestId if we're in a Lambda runtime invoke span + if let Some(span) = ctx.lookup_current() { + if let Some(span_fields) = self.span_fields.lock().unwrap().get(&span.id()) { + if let Some(request_id) = span_fields.get("requestId") { + fields.insert("span_request_id".to_string(), request_id.clone()); + } + } + } + self.logs.lock().unwrap().push(fields); } } @@ -1035,6 +1069,14 @@ mod endpoint_tests { let handler = crate::service_fn(test_handler); let client = Arc::new(Client::builder().with_endpoint(base).build()?); + + // Add tracing layer to capture span fields + use crate::layers::trace::TracingLayer; + use tower::ServiceBuilder; + let service = ServiceBuilder::new() + .layer(TracingLayer::new()) + .service(wrap_handler(handler, client.clone())); + let runtime = Runtime { client: client.clone(), config: Arc::new(Config { @@ -1044,7 +1086,7 @@ mod endpoint_tests { log_stream: "test_stream".to_string(), log_group: "test_log".to_string(), }), - service: wrap_handler(handler, client), + service, concurrency_limit: 3, }; @@ -1074,6 +1116,8 @@ mod endpoint_tests { let mut seen_ids = HashSet::new(); for log in &relevant_logs { let observed_id = log.get("observed_request_id").unwrap(); + let span_request_id = log.get("span_request_id").unwrap(); + assert!( observed_id.starts_with("test-request-"), "Request ID should match pattern: {}", @@ -1084,6 +1128,13 @@ mod endpoint_tests { "Request ID should be unique: {}", observed_id ); + + // Verify span request ID matches logged request ID + assert_eq!( + observed_id, span_request_id, + "Span request ID should match logged request ID: span={}, logged={}", + span_request_id, observed_id + ); } println!( From 0ed35a1cb0cba1b2c277336027e8476912ebed26 Mon Sep 17 00:00:00 2001 From: Astraea Quinn S <249650883+FullyTyped@users.noreply.github.com> Date: Wed, 14 Jan 2026 14:14:41 +0000 Subject: [PATCH 4/5] Use tracing_capture --- lambda-runtime/Cargo.toml | 1 + lambda-runtime/src/runtime.rs | 113 ++++++++-------------------------- 2 files changed, 26 insertions(+), 88 deletions(-) diff --git a/lambda-runtime/Cargo.toml b/lambda-runtime/Cargo.toml index dfa227cb..9b02931e 100644 --- a/lambda-runtime/Cargo.toml +++ b/lambda-runtime/Cargo.toml @@ -73,6 +73,7 @@ idna_adapter = "=1.2.0" lambda_runtime = { path = ".", features = ["tracing", "graceful-shutdown"] } pin-project-lite = { workspace = true } tracing-appender = "0.2" +tracing-capture = "0.1.0" tracing-subscriber = { version = "0.3", features = ["registry"] } [package.metadata.docs.rs] diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 575c11ab..48ab0ff0 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -912,83 +912,14 @@ mod endpoint_tests { #[tokio::test] #[cfg(feature = "experimental-concurrency")] async fn test_concurrent_structured_logging_isolation() -> Result<(), Error> { - use std::{ - collections::{HashMap, HashSet}, - sync::Mutex, - }; - use tracing::{info, subscriber::set_global_default}; - use tracing_subscriber::{layer::SubscriberExt, Layer}; - - #[derive(Clone)] - struct LogCapture { - logs: Arc>>>, - span_fields: Arc>>>, - } - - impl LogCapture { - fn new() -> Self { - Self { - logs: Arc::new(Mutex::new(Vec::new())), - span_fields: Arc::new(Mutex::new(HashMap::new())), - } - } - } - - impl Layer for LogCapture - where - S: tracing::Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, - { - fn on_new_span( - &self, - attrs: &tracing::span::Attributes<'_>, - id: &tracing::Id, - _ctx: tracing_subscriber::layer::Context<'_, S>, - ) { - if attrs.metadata().name() == "Lambda runtime invoke" { - let mut fields = HashMap::new(); - struct FieldVisitor<'a>(&'a mut HashMap); - impl<'a> tracing::field::Visit for FieldVisitor<'a> { - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.0.insert( - field.name().to_string(), - format!("{:?}", value).trim_matches('"').to_string(), - ); - } - } - attrs.record(&mut FieldVisitor(&mut fields)); - self.span_fields.lock().unwrap().insert(id.clone(), fields); - } - } + use std::collections::HashSet; + use tracing::info; + use tracing_capture::{CaptureLayer, SharedStorage}; + use tracing_subscriber::layer::SubscriberExt; - fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { - let mut fields = HashMap::new(); - struct FieldVisitor<'a>(&'a mut HashMap); - impl<'a> tracing::field::Visit for FieldVisitor<'a> { - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.0.insert( - field.name().to_string(), - format!("{:?}", value).trim_matches('"').to_string(), - ); - } - } - event.record(&mut FieldVisitor(&mut fields)); - - // Add span requestId if we're in a Lambda runtime invoke span - if let Some(span) = ctx.lookup_current() { - if let Some(span_fields) = self.span_fields.lock().unwrap().get(&span.id()) { - if let Some(request_id) = span_fields.get("requestId") { - fields.insert("span_request_id".to_string(), request_id.clone()); - } - } - } - - self.logs.lock().unwrap().push(fields); - } - } - - let log_capture = LogCapture::new(); - let subscriber = tracing_subscriber::registry().with(log_capture.clone()); - set_global_default(subscriber).unwrap(); + let storage = SharedStorage::default(); + let subscriber = tracing_subscriber::registry().with(CaptureLayer::new(&storage)); + tracing::subscriber::set_global_default(subscriber).unwrap(); let request_count = Arc::new(AtomicUsize::new(0)); let listener = TcpListener::bind("127.0.0.1:0").await?; @@ -1104,19 +1035,29 @@ mod endpoint_tests { runtime_handle.abort(); server_handle.abort(); - let logs = log_capture.logs.lock().unwrap(); - let relevant_logs: Vec<_> = logs.iter().filter(|l| l.contains_key("observed_request_id")).collect(); + let storage = storage.lock(); + let events: Vec<_> = storage + .all_events() + .filter(|e| e.value("observed_request_id").is_some()) + .collect(); assert!( - relevant_logs.len() >= 300, + events.len() >= 300, "Should have at least 300 log entries, got {}", - relevant_logs.len() + events.len() ); let mut seen_ids = HashSet::new(); - for log in &relevant_logs { - let observed_id = log.get("observed_request_id").unwrap(); - let span_request_id = log.get("span_request_id").unwrap(); + for event in &events { + let observed_id = event["observed_request_id"].as_str().unwrap(); + + // Find the parent "Lambda runtime invoke" span and get its requestId + let span_request_id = event + .ancestors() + .find(|s| s.metadata().name() == "Lambda runtime invoke") + .and_then(|s| s.value("requestId")) + .and_then(|v| v.as_str()) + .expect("Event should have a Lambda runtime invoke ancestor with requestId"); assert!( observed_id.starts_with("test-request-"), @@ -1124,7 +1065,7 @@ mod endpoint_tests { observed_id ); assert!( - seen_ids.insert(observed_id.clone()), + seen_ids.insert(observed_id.to_string()), "Request ID should be unique: {}", observed_id ); @@ -1137,10 +1078,6 @@ mod endpoint_tests { ); } - println!( - "✅ Concurrent structured logging test passed with {} unique request IDs", - seen_ids.len() - ); Ok(()) } } From 48ef7f9db69cb5ff88af7e57c844523bcf65b998 Mon Sep 17 00:00:00 2001 From: Astraea Quinn S <249650883+FullyTyped@users.noreply.github.com> Date: Wed, 14 Jan 2026 14:21:40 +0000 Subject: [PATCH 5/5] Use notify channel to indicate done --- lambda-runtime/src/runtime.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lambda-runtime/src/runtime.rs b/lambda-runtime/src/runtime.rs index 48ab0ff0..e9a6bb27 100644 --- a/lambda-runtime/src/runtime.rs +++ b/lambda-runtime/src/runtime.rs @@ -922,12 +922,14 @@ mod endpoint_tests { tracing::subscriber::set_global_default(subscriber).unwrap(); let request_count = Arc::new(AtomicUsize::new(0)); + let done = Arc::new(tokio::sync::Notify::new()); let listener = TcpListener::bind("127.0.0.1:0").await?; let addr = listener.local_addr()?; let base: http::Uri = format!("http://{addr}").parse()?; let server_handle = { let request_count = request_count.clone(); + let done = done.clone(); tokio::spawn(async move { loop { let (tcp, _) = match listener.accept().await { @@ -936,8 +938,10 @@ mod endpoint_tests { }; let request_count = request_count.clone(); + let done = done.clone(); let service = service_fn(move |req: Request| { let request_count = request_count.clone(); + let done = done.clone(); async move { let (parts, body) = req.into_parts(); if parts.method == Method::POST { @@ -957,6 +961,7 @@ mod endpoint_tests { .unwrap(); return Ok::<_, Infallible>(res); } else { + done.notify_one(); let res = Response::builder() .status(StatusCode::NO_CONTENT) .body(Full::new(Bytes::new())) @@ -1023,14 +1028,9 @@ mod endpoint_tests { let runtime_handle = tokio::spawn(async move { runtime.run_concurrent().await }); - loop { - tokio::time::sleep(Duration::from_millis(100)).await; - let count = request_count.load(Ordering::SeqCst); - if count >= 300 { - tokio::time::sleep(Duration::from_millis(500)).await; - break; - } - } + done.notified().await; + // Give handlers time to complete after server signals done + tokio::time::sleep(Duration::from_millis(500)).await; runtime_handle.abort(); server_handle.abort();