Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 207 additions & 12 deletions sentry_streams/src/batch_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::pipeline_stats::get_stats;
use crate::routes::{Route, RoutedValue};
use crate::time_helpers::current_epoch;
use crate::utils::traced_with_gil;
use chrono::{DateTime, Utc};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyList};
use sentry_arroyo::processing::strategies::{
Expand All @@ -24,6 +25,12 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size";
const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms";
const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms";
const METRIC_BATCH_SUBMIT_REJECTED: &str = "streams.pipeline.batch.submit_rejected";

/// How often the debounced "MessageRejected" log line and counter metric may be
/// emitted. Submits are retried in a tight loop under backpressure, so we
/// aggregate rejections instead of emitting one per attempt.
const REJECTED_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(3);

fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option<String> {
match first {
Expand Down Expand Up @@ -219,6 +226,12 @@ pub struct BatchStep {
pending_batch: bool,
commit_request_carried_over: Option<CommitRequest>,
step_labels: Vec<(String, String)>,
/// Rejections accumulated since the last debounced emission.
rejected_count: u64,
/// Wall-clock time of the first rejection in the current (un-emitted) window.
rejected_first_at: Option<SystemTime>,
/// Monotonic time of the last debounced emission.
rejected_last_emitted: Option<Instant>,
}

impl BatchStep {
Expand All @@ -242,7 +255,45 @@ impl BatchStep {
pending_batch: false,
commit_request_carried_over: None,
step_labels,
rejected_count: 0,
rejected_first_at: None,
rejected_last_emitted: None,
}
}

/// Records a `MessageRejected` outcome during outbound draining. To avoid
/// flooding the logs and metrics in the tight backpressure retry loop, the
/// log line and counter metric are emitted at most once per
/// [`REJECTED_DEBOUNCE_INTERVAL`], carrying the number of occurrences since
/// the last emission and the timestamp of the first one.
fn record_rejected_submit(&mut self, submit_duration_ms: f64, outbound_len: usize) {
let now = Instant::now();
self.rejected_count += 1;
self.rejected_first_at.get_or_insert_with(SystemTime::now);

let due = self.rejected_last_emitted.map_or(true, |t| {
now.duration_since(t) >= REJECTED_DEBOUNCE_INTERVAL
});
if !due {
return;
}

let first_at = self.rejected_first_at.map(|t| DateTime::<Utc>::from(t));
log::debug!(
"BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}, occurrences: {:?}, first_rejected_at: {:?}",
self.step_name,
submit_duration_ms,
outbound_len,
"message_rejected",
self.rejected_count,
first_at,
);
metrics::counter!(METRIC_BATCH_SUBMIT_REJECTED, &self.step_labels)
.increment(self.rejected_count);

self.rejected_count = 0;
self.rejected_first_at = None;
self.rejected_last_emitted = Some(now);
}

/// Tries to drain the queue containing the pending messages.
Expand All @@ -262,18 +313,27 @@ impl BatchStep {
let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0;
metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels)
.set(submit_duration_ms);
let submit_outcome = match &submit_result {
Ok(()) => "ok",
Err(SubmitError::MessageRejected(_)) => "message_rejected",
Err(SubmitError::InvalidMessage(_)) => "invalid_message",
};
log::debug!(
"BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}",
self.step_name,
submit_duration_ms,
outbound_len,
submit_outcome
);
match &submit_result {
// MessageRejected happens on every retry of a stalled submit, i.e. in a tight
// loop, so it is debounced separately rather than logged here.
Err(SubmitError::MessageRejected(_)) => {
self.record_rejected_submit(submit_duration_ms, outbound_len)
}
other => {
let submit_outcome = match other {
Ok(()) => "ok",
Err(SubmitError::InvalidMessage(_)) => "invalid_message",
Err(SubmitError::MessageRejected(_)) => unreachable!(),
};
log::debug!(
"BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}",
self.step_name,
submit_duration_ms,
outbound_len,
submit_outcome
);
}
}
match submit_result {
Ok(()) => {
if self.pending_batch {
Expand Down Expand Up @@ -857,4 +917,139 @@ mod tests {
});
}
}

mod rejected_debounce {
//! [`BatchStep::record_rejected_submit`]: the debounced log/metric emitted
//! for `MessageRejected` outcomes while draining the outbound queue.

use super::super::{BatchStep, METRIC_BATCH_SUBMIT_REJECTED, REJECTED_DEBOUNCE_INTERVAL};
use crate::fake_strategy::FakeStrategy;
use crate::routes::Route;
use metrics::{
Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit,
};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

/// Minimal `metrics` recorder that captures counter increments. Only the
/// counter path is exercised by `record_rejected_submit`.
struct CounterRecorder {
counters: Arc<Mutex<Vec<(Key, u64)>>>,
}

impl Recorder for CounterRecorder {
fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
Counter::from_arc(Arc::new(CaptureCounter {
key: key.clone(),
counters: Arc::clone(&self.counters),
}))
}
fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge {
Gauge::noop()
}
fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram {
Histogram::noop()
}
}

struct CaptureCounter {
key: Key,
counters: Arc<Mutex<Vec<(Key, u64)>>>,
}

impl metrics::CounterFn for CaptureCounter {
fn increment(&self, value: u64) {
self.counters
.lock()
.unwrap()
.push((self.key.clone(), value));
}
fn absolute(&self, _: u64) {}
}

fn build_step() -> BatchStep {
let sub = Arc::new(Mutex::new(Vec::new()));
let wms = Arc::new(Mutex::new(Vec::new()));
let next = FakeStrategy::new(sub, wms, false);
BatchStep::new(
Route::new("s".into(), vec!["w".into()]),
None,
None,
"test_batch".to_string(),
Box::new(next),
)
}

/// All values emitted to the rejected-submit counter, in order.
fn rejected_emissions(recorded: &[(Key, u64)]) -> Vec<u64> {
recorded
.iter()
.filter(|(k, _)| k.name() == METRIC_BATCH_SUBMIT_REJECTED)
.map(|(_, v)| *v)
.collect()
}

#[test]
fn first_rejection_emits_metric_immediately() {
let counters = Arc::new(Mutex::new(Vec::new()));
let recorder = CounterRecorder {
counters: Arc::clone(&counters),
};
let _guard = metrics::set_default_local_recorder(&recorder);

let mut step = build_step();
step.record_rejected_submit(1.0, 3);

assert_eq!(rejected_emissions(&counters.lock().unwrap()), vec![1]);
}

#[test]
fn rejections_within_interval_are_debounced() {
let counters = Arc::new(Mutex::new(Vec::new()));
let recorder = CounterRecorder {
counters: Arc::clone(&counters),
};
let _guard = metrics::set_default_local_recorder(&recorder);

let mut step = build_step();
// First emits immediately; the rest fall inside the debounce window.
step.record_rejected_submit(1.0, 0);
step.record_rejected_submit(1.0, 0);
step.record_rejected_submit(1.0, 0);

assert_eq!(
rejected_emissions(&counters.lock().unwrap()),
vec![1],
"only the first rejection should emit within the interval"
);
}

#[test]
fn emits_accumulated_count_after_interval() {
let counters = Arc::new(Mutex::new(Vec::new()));
let recorder = CounterRecorder {
counters: Arc::clone(&counters),
};
let _guard = metrics::set_default_local_recorder(&recorder);

let mut step = build_step();
step.record_rejected_submit(1.0, 0); // emits 1, resets the window
step.record_rejected_submit(1.0, 0); // accumulates (pending = 1)
step.record_rejected_submit(1.0, 0); // accumulates (pending = 2)

// Pretend the debounce window has elapsed since the last emission.
step.rejected_last_emitted =
Some(Instant::now() - REJECTED_DEBOUNCE_INTERVAL - Duration::from_secs(1));
step.record_rejected_submit(1.0, 0); // now due: emits the accumulated 3

assert_eq!(
rejected_emissions(&counters.lock().unwrap()),
vec![1, 3],
"immediate first emit, then the accumulated count once the interval passes"
);
}
}
}
Loading
Loading