Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 203 additions & 2 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,14 @@ pub struct Processor {
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
/// Time of the `SnapStart` restore event, set when `PlatformRestoreStart` is received.
restore_time: Option<Instant>,
/// Whether the init duration metric has already been emitted for this sandbox.
///
/// Init happens once per sandbox lifetime, so the metric must be emitted at most once.
/// The init duration metric is normally emitted from the `platform.initReport` telemetry event,
/// but if it does not arrive / arrives late, we fall back to the `init_duration_ms` field
/// on `platform.report`. This flag ensures whichever event arrives first wins and the other is skipped,
/// preventing double counting.
init_duration_metric_emitted: bool,
}

impl Processor {
Expand Down Expand Up @@ -145,6 +153,7 @@ impl Processor {
awaiting_first_invocation: false,
durable_context_tx,
restore_time: None,
init_duration_metric_emitted: false,
}
}

Expand Down Expand Up @@ -386,8 +395,15 @@ impl Processor {
duration_ms: f64,
timestamp: i64,
) {
self.enhanced_metrics
.set_init_duration_metric(init_type, duration_ms, timestamp);
// `platform.initReport` is the primary source for the init duration metric and carries the exact `init_type`.
// If the init duration metric has already been emitted, it means the init report event was received late /
// the platform report event with init duration was received first.
// In this case, we skip emitting the init duration metric from the platform init report event, to prevent double counting.
if !self.init_duration_metric_emitted {
self.enhanced_metrics
.set_init_duration_metric(init_type, duration_ms, timestamp);
self.init_duration_metric_emitted = true;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know this repo very well, but will this need locking when reading/setting the init_duration_metric_emitted value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only handle one event at a time, so it should not be needed here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, worth looking into a bit I think. For example, in the off chance you get two events at once, what would happen? Good to know what the race condition would potentially produce, and then if we're okay with that, then there's no need for locking.


// In Managed Instance mode, find the context with empty request_id
// In On-Demand mode, find the closest context by timestamp since we do not have the request_id
Expand Down Expand Up @@ -938,6 +954,20 @@ impl Processor {
self.enhanced_metrics
.set_post_runtime_duration_metric(post_runtime_duration_ms, timestamp);
}

// Fallback for the init duration metric. The `platform.initReport` event is the primary source for the init duration metric,
// but in case it does not arrive / arrives late, we fall back to the `init_duration_ms` field on `platform.report`.
// If the init duration metric has already been emitted on the init report event, we skip emitting the metric here, to prevent double counting.
if !self.init_duration_metric_emitted
&& let Some(init_duration_ms) = metrics.init_duration_ms
{
self.enhanced_metrics.set_init_duration_metric(
InitType::OnDemand,
init_duration_ms,
timestamp,
);
self.init_duration_metric_emitted = true;
}
}

pub fn on_shutdown_event(&mut self) {
Expand Down Expand Up @@ -1996,6 +2026,177 @@ mod tests {
);
}

#[tokio::test]
async fn test_init_duration_emitted_from_init_report() {
let mut processor = setup();
let timestamp = 1_000_000_i64;

// Processor receives the platform init report event
processor.on_platform_init_report(InitType::OnDemand, 200.0, timestamp);

// Check that the init duration metric was inserted into the aggregator
let tags = dogstatsd::metric::SortedTags::parse("init_type:on-demand").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INIT_DURATION_METRIC.into(),
tags,
timestamp,
)
.await
.unwrap();

assert!(
entry.is_some(),
"init duration should be emitted when platform.initReport is received"
);

// Check that the init duration metric emitted flag is set to true
assert!(
processor.init_duration_metric_emitted,
"init duration metric emitted flag should be set to true"
);
}

#[tokio::test]
async fn test_init_duration_emitted_from_report() {
let mut processor = setup();
let timestamp = 1_000_000_i64;
let report_metrics = OnDemandReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
memory_size_mb: 128,
max_memory_used_mb: 50,
init_duration_ms: Some(200.0), // init duration is present in the report on a cold start invocation
restore_duration_ms: None,
};

// Processor receives the platform report event
// This should set the init duration metric as the processor is initialized
// with init_duration_metric_emitted set to false
processor.handle_ondemand_report(&"req-1".to_string(), report_metrics, timestamp);

// Check that the init duration metric was inserted into the aggregator
let tags = dogstatsd::metric::SortedTags::parse("init_type:on-demand").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INIT_DURATION_METRIC.into(),
tags,
timestamp,
)
.await
.unwrap();
assert!(
entry.is_some(),
"init duration should be emitted from the report fallback"
);

// Check that the init duration metric emitted flag is set to true
assert!(
processor.init_duration_metric_emitted,
"init duration metric emitted flag should be set to true"
);
}

#[tokio::test]
async fn test_init_duration_not_emitted_for_warm_invocation_report() {
let mut processor = setup();
let timestamp = 1_000_000_i64;
let report_metrics = OnDemandReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
memory_size_mb: 128,
max_memory_used_mb: 50,
init_duration_ms: None, // init duration is None for a non-cold start invocation
restore_duration_ms: None,
};

// A warm invocation has no init_duration_ms, so nothing should be emitted
processor.handle_ondemand_report(&"req-1".to_string(), report_metrics, timestamp);

// Check that an init duration metric was NOT inserted into the aggregator
assert!(!processor.init_duration_metric_emitted);
let tags = dogstatsd::metric::SortedTags::parse("init_type:on-demand").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INIT_DURATION_METRIC.into(),
tags,
timestamp,
)
.await
.unwrap();
assert!(
entry.is_none(),
"no init duration metric should be inserted on a warm report"
);
}

#[tokio::test]
async fn test_report_does_not_re_emit_init_duration_after_init_report() {
let mut processor = setup();
// Give the init report and the platform report different timestamps so they fall in different aggregation buckets
let init_report_timestamp = 1_000_000_i64;
let platform_report_timestamp = 1_000_100_i64;
let report_metrics = OnDemandReportMetrics {
duration_ms: 100.0,
billed_duration_ms: 100,
memory_size_mb: 128,
max_memory_used_mb: 50,
init_duration_ms: Some(200.0), // init duration is present in the report on a cold start invocation
restore_duration_ms: None,
};

// Processor receives the platform init report event, which emits the init duration metric.
processor.on_platform_init_report(InitType::OnDemand, 200.0, init_report_timestamp);

// Processor then receives the platform report event; the fallback must not emit again.
processor.handle_ondemand_report(
&"req-1".to_string(),
report_metrics,
platform_report_timestamp,
);

// Filter for metrics with the "on-demand" init type
let on_demand = dogstatsd::metric::SortedTags::parse("init_type:on-demand").ok();

// Check that the init duration metric was emitted with the init report timestamp
let from_init_report = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INIT_DURATION_METRIC.into(),
on_demand.clone(),
init_report_timestamp,
)
.await
.unwrap();
assert!(
from_init_report.is_some(),
"platform.initReport should have emitted the init duration metric"
);

// Check that the init duration metric was not also emitted with the platform report timestamp
let from_report = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INIT_DURATION_METRIC.into(),
on_demand,
platform_report_timestamp,
)
.await
.unwrap();
assert!(
from_report.is_none(),
"platform.report should not emit a second init duration metric after platform.initReport"
);
}

#[tokio::test]
async fn test_is_managed_instance_mode_returns_true() {
use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE;
Expand Down
Loading