Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 93 additions & 93 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,34 +219,32 @@ impl Processor {
self.enhanced_metrics.increment_invocation_metric(timestamp);
self.enhanced_metrics.set_invoked_received();

// MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationStart with request_id
if self.aws_config.is_managed_instance_mode() {
if let Some(buffered_event) = self
.context_buffer
.take_universal_instrumentation_start_for_request_id(&request_id)
// Both modes: check for buffered UniversalInstrumentationStart by request_id first.
// Falls back to FIFO for on-demand when the tracer does not send the header.
if let Some(buffered_event) = self
.context_buffer
.take_universal_instrumentation_start_for_request_id(&request_id)
{
Comment on lines +222 to +227
debug!(
"Found buffered UniversalInstrumentationStart for request_id: {}",
request_id
);
// Infer span
self.inferrer
.infer_span(&buffered_event.payload_value, &self.aws_config);
self.process_on_universal_instrumentation_start(
request_id,
buffered_event.headers,
buffered_event.payload_value,
);
} else if !self.aws_config.is_managed_instance_mode() {
// ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers.
if let Some((headers, payload_value)) =
self.context_buffer.pair_invoke_event(&request_id)
{
debug!(
"Managed Instance: Found buffered UniversalInstrumentationStart for request_id: {}",
request_id
);
// Infer span
self.inferrer
.infer_span(&buffered_event.payload_value, &self.aws_config);
self.process_on_universal_instrumentation_start(
request_id,
buffered_event.headers,
buffered_event.payload_value,
);
self.inferrer.infer_span(&payload_value, &self.aws_config);
self.process_on_universal_instrumentation_start(request_id, headers, payload_value);
}
return;
}

// ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged)
// If `UniversalInstrumentationStart` event happened first, process it
if let Some((headers, payload_value)) = self.context_buffer.pair_invoke_event(&request_id) {
// Infer span
self.inferrer.infer_span(&payload_value, &self.aws_config);
self.process_on_universal_instrumentation_start(request_id, headers, payload_value);
}
}

Expand Down Expand Up @@ -521,25 +519,23 @@ impl Processor {
self.context_buffer
.add_runtime_duration(request_id, metrics.duration_ms);

// MANAGED INSTANCE MODE: Check for buffered UniversalInstrumentationEnd with request_id
if self.aws_config.is_managed_instance_mode() {
if let Some(buffered_event) = self
.context_buffer
.take_universal_instrumentation_end_for_request_id(request_id)
{
debug!(
"Managed Instance: Found buffered UniversalInstrumentationEnd for request_id: {}",
request_id
);
self.process_on_universal_instrumentation_end(
request_id.clone(),
buffered_event.headers,
buffered_event.payload_value,
);
}
} else {
// ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged)
// If `UniversalInstrumentationEnd` event happened first, process it first
// Both modes: check for buffered UniversalInstrumentationEnd by request_id first.
// Falls back to FIFO for on-demand when the tracer does not send the header.
if let Some(buffered_event) = self
.context_buffer
.take_universal_instrumentation_end_for_request_id(request_id)
{
debug!(
"Found buffered UniversalInstrumentationEnd for request_id: {}",
request_id
);
self.process_on_universal_instrumentation_end(
request_id.clone(),
buffered_event.headers,
buffered_event.payload_value,
);
} else if !self.aws_config.is_managed_instance_mode() {
// ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers.
if let Some((headers, payload)) = self
.context_buffer
.pair_platform_runtime_done_event(request_id)
Expand Down Expand Up @@ -970,40 +966,42 @@ impl Processor {

self.inferrer.infer_span(&payload_value, &self.aws_config);

// MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations
if self.aws_config.is_managed_instance_mode() {
if let Some(req_id) = request_id {
// Both modes: use request_id-based pairing when the tracer sends the header.
if let Some(req_id) = request_id {
debug!(
"Processing UniversalInstrumentationStart for request_id: {}",
req_id
);
if self
.context_buffer
.pair_universal_instrumentation_start_with_request_id(
&req_id,
&headers,
&payload_value,
)
{
// Invoke event already happened, process immediately
self.process_on_universal_instrumentation_start(req_id, headers, payload_value);
} else {
// Buffered for later pairing when Invoke event arrives
debug!(
"Managed Instance: Processing UniversalInstrumentationStart for request_id: {}",
"Buffered UniversalInstrumentationStart for request_id: {}",
req_id
);
if self
.context_buffer
.pair_universal_instrumentation_start_with_request_id(
&req_id,
&headers,
&payload_value,
)
{
// Invoke event already happened, process immediately
self.process_on_universal_instrumentation_start(req_id, headers, payload_value);
} else {
// Buffered for later pairing when Invoke event arrives
debug!(
"Managed Instance: Buffered UniversalInstrumentationStart for request_id: {}",
req_id
);
}
return;
}
// Missing request_id in managed instance mode - log warning and fall back to FIFO
return;
}

// Missing request_id: warn in Managed Instance mode (concurrent invocations make FIFO
// unsafe), fall back to FIFO for on-demand.
if self.aws_config.is_managed_instance_mode() {
warn!(
"Managed Instance: UniversalInstrumentationStart missing request_id header. \
Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)"
);
}

// ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged)
// ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers.
if let Some(request_id) = self
.context_buffer
.pair_universal_instrumentation_start_event(&headers, &payload_value)
Expand Down Expand Up @@ -1185,40 +1183,42 @@ impl Processor {
payload_value: Value,
request_id: Option<String>,
) {
// MANAGED INSTANCE MODE: Use request ID-based pairing for concurrent invocations
if self.aws_config.is_managed_instance_mode() {
if let Some(req_id) = request_id {
// Both modes: use request_id-based pairing when the tracer sends the header.
if let Some(req_id) = request_id {
debug!(
"Processing UniversalInstrumentationEnd for request_id: {}",
req_id
);
if self
.context_buffer
.pair_universal_instrumentation_end_with_request_id(
&req_id,
&headers,
&payload_value,
)
{
// PlatformRuntimeDone already happened, process immediately
self.process_on_universal_instrumentation_end(req_id, headers, payload_value);
} else {
// Buffered for later pairing when PlatformRuntimeDone arrives
debug!(
"Managed Instance: Processing UniversalInstrumentationEnd for request_id: {}",
"Buffered UniversalInstrumentationEnd for request_id: {}",
req_id
);
if self
.context_buffer
.pair_universal_instrumentation_end_with_request_id(
&req_id,
&headers,
&payload_value,
)
{
// PlatformRuntimeDone already happened, process immediately
self.process_on_universal_instrumentation_end(req_id, headers, payload_value);
} else {
// Buffered for later pairing when PlatformRuntimeDone arrives
debug!(
"Managed Instance: Buffered UniversalInstrumentationEnd for request_id: {}",
req_id
);
}
return;
}
// Missing request_id in managed instance mode - log warning and fall back to FIFO
return;
}

// Missing request_id: warn in Managed Instance mode (concurrent invocations make FIFO
// unsafe), fall back to FIFO for on-demand.
if self.aws_config.is_managed_instance_mode() {
warn!(
"Managed Instance: UniversalInstrumentationEnd missing request_id header. \
Falling back to FIFO pairing (may cause incorrect pairing with concurrent invocations)"
);
}

// ON-DEMAND MODE: Use existing FIFO pairing logic (unchanged)
// ON-DEMAND MODE FIFO fallback: for tracers that do not send request_id headers.
// If `PlatformRuntimeDone` event happened first, process
if let Some(request_id) = self
.context_buffer
Expand Down
Loading