diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ec1af99ab..01923c458 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -219,34 +219,32 @@ impl Processor { self.enhanced_metrics.increment_invocation_metric(timestamp); self.enhanced_metrics.set_invoked_received(); - // MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationStart with request_id - if self.aws_config.is_managed_instance_mode() { - if let Some(buffered_event) = self - .context_buffer - .take_universal_instrumentation_start_for_request_id(&request_id) + // Both modes: check for buffered UniversalInstrumentationStart by request_id first. + // Falls back to FIFO for on-demand when the tracer does not send the header. + if let Some(buffered_event) = self + .context_buffer + .take_universal_instrumentation_start_for_request_id(&request_id) + { + debug!( + "Found buffered UniversalInstrumentationStart for request_id: {}", + request_id + ); + // Infer span + self.inferrer + .infer_span(&buffered_event.payload_value, &self.aws_config); + self.process_on_universal_instrumentation_start( + request_id, + buffered_event.headers, + buffered_event.payload_value, + ); + } else if !self.aws_config.is_managed_instance_mode() { + // ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers. + if let Some((headers, payload_value)) = + self.context_buffer.pair_invoke_event(&request_id) { - debug!( - "Managed Instance: Found buffered UniversalInstrumentationStart for request_id: {}", - request_id - ); - // Infer span - self.inferrer - .infer_span(&buffered_event.payload_value, &self.aws_config); - self.process_on_universal_instrumentation_start( - request_id, - buffered_event.headers, - buffered_event.payload_value, - ); + self.inferrer.infer_span(&payload_value, &self.aws_config); + self.process_on_universal_instrumentation_start(request_id, headers, payload_value); } - return; - } - - // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) - // If `UniversalInstrumentationStart` event happened first, process it - if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) { - // Infer span - self.inferrer.infer_span(&payload_value, &self.aws_config); - self.process_on_universal_instrumentation_start(request_id, headers, payload_value); } } @@ -521,25 +519,23 @@ impl Processor { self.context_buffer .add_runtime_duration(request_id, metrics.duration_ms); - // MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationEnd with request_id - if self.aws_config.is_managed_instance_mode() { - if let Some(buffered_event) = self - .context_buffer - .take_universal_instrumentation_end_for_request_id(request_id) - { - debug!( - "Managed Instance: Found buffered UniversalInstrumentationEnd for request_id: {}", - request_id - ); - self.process_on_universal_instrumentation_end( - request_id.clone(), - buffered_event.headers, - buffered_event.payload_value, - ); - } - } else { - // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) - // If `UniversalInstrumentationEnd` event happened first, process it first + // Both modes: check for buffered UniversalInstrumentationEnd by request_id first. + // Falls back to FIFO for on-demand when the tracer does not send the header. + if let Some(buffered_event) = self + .context_buffer + .take_universal_instrumentation_end_for_request_id(request_id) + { + debug!( + "Found buffered UniversalInstrumentationEnd for request_id: {}", + request_id + ); + self.process_on_universal_instrumentation_end( + request_id.clone(), + buffered_event.headers, + buffered_event.payload_value, + ); + } else if !self.aws_config.is_managed_instance_mode() { + // ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers. if let Some((headers, payload)) = self .context_buffer .pair_platform_runtime_done_event(request_id) @@ -970,40 +966,42 @@ impl Processor { self.inferrer.infer_span(&payload_value, &self.aws_config); - // MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations - if self.aws_config.is_managed_instance_mode() { - if let Some(req_id) = request_id { + // Both modes: use request_id-based pairing when the tracer sends the header. + if let Some(req_id) = request_id { + debug!( + "Processing UniversalInstrumentationStart for request_id: {}", + req_id + ); + if self + .context_buffer + .pair_universal_instrumentation_start_with_request_id( + &req_id, + &headers, + &payload_value, + ) + { + // Invoke event already happened, process immediately + self.process_on_universal_instrumentation_start(req_id, headers, payload_value); + } else { + // Buffered for later pairing when Invoke event arrives debug!( - "Managed Instance: Processing UniversalInstrumentationStart for request_id: {}", + "Buffered UniversalInstrumentationStart for request_id: {}", req_id ); - if self - .context_buffer - .pair_universal_instrumentation_start_with_request_id( - &req_id, - &headers, - &payload_value, - ) - { - // Invoke event already happened, process immediately - self.process_on_universal_instrumentation_start(req_id, headers, payload_value); - } else { - // Buffered for later pairing when Invoke event arrives - debug!( - "Managed Instance: Buffered UniversalInstrumentationStart for request_id: {}", - req_id - ); - } - return; } - // Missing request_id in managed instance mode - log warning and fall back to FIFO + return; + } + + // Missing request_id: warn in Managed Instance mode (concurrent invocations make FIFO + // unsafe), fall back to FIFO for on-demand. + if self.aws_config.is_managed_instance_mode() { warn!( "Managed Instance: UniversalInstrumentationStart missing request_id header. \ Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)" ); } - // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) + // ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers. if let Some(request_id) = self .context_buffer .pair_universal_instrumentation_start_event(&headers, &payload_value) @@ -1185,40 +1183,42 @@ impl Processor { payload_value: Value, request_id: Option, ) { - // MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations - if self.aws_config.is_managed_instance_mode() { - if let Some(req_id) = request_id { + // Both modes: use request_id-based pairing when the tracer sends the header. + if let Some(req_id) = request_id { + debug!( + "Processing UniversalInstrumentationEnd for request_id: {}", + req_id + ); + if self + .context_buffer + .pair_universal_instrumentation_end_with_request_id( + &req_id, + &headers, + &payload_value, + ) + { + // PlatformRuntimeDone already happened, process immediately + self.process_on_universal_instrumentation_end(req_id, headers, payload_value); + } else { + // Buffered for later pairing when PlatformRuntimeDone arrives debug!( - "Managed Instance: Processing UniversalInstrumentationEnd for request_id: {}", + "Buffered UniversalInstrumentationEnd for request_id: {}", req_id ); - if self - .context_buffer - .pair_universal_instrumentation_end_with_request_id( - &req_id, - &headers, - &payload_value, - ) - { - // PlatformRuntimeDone already happened, process immediately - self.process_on_universal_instrumentation_end(req_id, headers, payload_value); - } else { - // Buffered for later pairing when PlatformRuntimeDone arrives - debug!( - "Managed Instance: Buffered UniversalInstrumentationEnd for request_id: {}", - req_id - ); - } - return; } - // Missing request_id in managed instance mode - log warning and fall back to FIFO + return; + } + + // Missing request_id: warn in Managed Instance mode (concurrent invocations make FIFO + // unsafe), fall back to FIFO for on-demand. + if self.aws_config.is_managed_instance_mode() { warn!( "Managed Instance: UniversalInstrumentationEnd missing request_id header. \ Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)" ); } - // ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged) + // ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers. // If `PlatformRuntimeDone` event happened first, process if let Some(request_id) = self .context_buffer