diff --git a/crates/dogstatsd/src/aggregator_service.rs b/crates/dogstatsd/src/aggregator_service.rs index 21bb7bc..eb888dc 100644 --- a/crates/dogstatsd/src/aggregator_service.rs +++ b/crates/dogstatsd/src/aggregator_service.rs @@ -106,8 +106,10 @@ impl AggregatorService { AggregatorCommand::InsertBatch(metrics) => { let mut insert_errors = 0; for metric in metrics { + let metric_name = metric.name; // The only possible error here is an overflow - if let Err(_e) = self.aggregator.insert(metric) { + if let Err(e) = self.aggregator.insert(metric) { + warn!("Failed to insert metric '{metric_name}': {e:?}"); insert_errors += 1; } } @@ -255,4 +257,42 @@ mod tests { handle.shutdown().expect("Failed to shutdown"); service_task.await.expect("Service task failed"); } + + #[tokio::test] + async fn test_aggregator_service_insert_overflow() { + // Create an aggregator with very small capacity to trigger overflow + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 2).expect("Failed to create aggregator service"); + + let service_task = tokio::spawn(service.run()); + + // Create metrics with different names and tags to exceed max_context + let metrics = vec![ + parse("metric1:1|c|#tag:a").expect("metric parse failed"), + parse("metric2:2|c|#tag:b").expect("metric parse failed"), + parse("metric3:3|c|#tag:c").expect("metric parse failed"), + parse("metric4:4|c|#tag:d").expect("metric parse failed"), + parse("metric5:5|c|#tag:e").expect("metric parse failed"), + ]; + + // Insert batch - some should fail due to overflow + handle + .insert_batch(metrics) + .expect("Failed to send insert batch"); + + // Flush and verify that not all metrics were inserted + let response = handle.flush().await.expect("Failed to flush"); + + // With max_context=2, we should only successfully insert 2 unique metrics + // The rest should fail with overflow errors (which are logged with metric names) + assert!( + response.series[0].series.len() <= 2, + "Expected at most 2 metrics to be inserted, got {}", + response.series[0].series.len() + ); + + // Shutdown the service + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + } }