Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion crates/dogstatsd/src/aggregator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ impl AggregatorService {
AggregatorCommand::InsertBatch(metrics) => {
let mut insert_errors = 0;
for metric in metrics {
let metric_name = metric.name;
// The only possible error here is an overflow
if let Err(_e) = self.aggregator.insert(metric) {
if let Err(e) = self.aggregator.insert(metric) {
warn!("Failed to insert metric '{metric_name}': {e:?}");
insert_errors += 1;
}
}
Expand Down Expand Up @@ -255,4 +257,42 @@ mod tests {
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}

#[tokio::test]
async fn test_aggregator_service_insert_overflow() {
// Create an aggregator with very small capacity to trigger overflow
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 2).expect("Failed to create aggregator service");

let service_task = tokio::spawn(service.run());

// Create metrics with different names and tags to exceed max_context
let metrics = vec![
parse("metric1:1|c|#tag:a").expect("metric parse failed"),
parse("metric2:2|c|#tag:b").expect("metric parse failed"),
parse("metric3:3|c|#tag:c").expect("metric parse failed"),
parse("metric4:4|c|#tag:d").expect("metric parse failed"),
parse("metric5:5|c|#tag:e").expect("metric parse failed"),
];

// Insert batch - some should fail due to overflow
handle
.insert_batch(metrics)
.expect("Failed to send insert batch");

// Flush and verify that not all metrics were inserted
let response = handle.flush().await.expect("Failed to flush");

// With max_context=2, we should only successfully insert 2 unique metrics
// The rest should fail with overflow errors (which are logged with metric names)
assert!(
response.series[0].series.len() <= 2,
"Expected at most 2 metrics to be inserted, got {}",
response.series[0].series.len()
);

// Shutdown the service
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}
}
Loading