From 75be834ac7130030be1a71265fd8bb45833c1509 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Sat, 6 Jun 2026 18:11:31 -0700 Subject: [PATCH] perf(streams): Debounce noisy MessageRejected backpressure logs Under backpressure, submit is retried in a tight loop, so the per-attempt debug logs for MessageRejected outcomes flooded the logs. In BatchStep, the rejected drain-submit log and a new streams.pipeline.batch.submit_rejected counter metric are now emitted at most once per 3s, carrying the accumulated occurrence count and the timestamp of the first rejection. Ok and InvalidMessage outcomes still log every time. In PythonAdapter, a DebouncedLogger (5s) wraps the two per-submit debug logs and is applied only to the MessageRejected outcome; Ok and InvalidMessage log unchanged. Co-Authored-By: Claude Opus 4.8 --- sentry_streams/src/batch_step.rs | 219 ++++++++++++++++++++++++-- sentry_streams/src/python_operator.rs | 108 +++++++++++-- 2 files changed, 300 insertions(+), 27 deletions(-) diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 28e51e20..f3e8cdbd 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -10,6 +10,7 @@ use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; use crate::utils::traced_with_gil; +use chrono::{DateTime, Utc}; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyList}; use sentry_arroyo::processing::strategies::{ @@ -24,6 +25,12 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms"; +const METRIC_BATCH_SUBMIT_REJECTED: &str = "streams.pipeline.batch.submit_rejected"; + +/// How often the debounced "MessageRejected" log line and counter metric may be +/// emitted. Submits are retried in a tight loop under backpressure, so we +/// aggregate rejections instead of emitting one per attempt. +const REJECTED_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(3); fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -219,6 +226,12 @@ pub struct BatchStep { pending_batch: bool, commit_request_carried_over: Option, step_labels: Vec<(String, String)>, + /// Rejections accumulated since the last debounced emission. + rejected_count: u64, + /// Wall-clock time of the first rejection in the current (un-emitted) window. + rejected_first_at: Option, + /// Monotonic time of the last debounced emission. + rejected_last_emitted: Option, } impl BatchStep { @@ -242,7 +255,45 @@ impl BatchStep { pending_batch: false, commit_request_carried_over: None, step_labels, + rejected_count: 0, + rejected_first_at: None, + rejected_last_emitted: None, + } + } + + /// Records a `MessageRejected` outcome during outbound draining. To avoid + /// flooding the logs and metrics in the tight backpressure retry loop, the + /// log line and counter metric are emitted at most once per + /// [`REJECTED_DEBOUNCE_INTERVAL`], carrying the number of occurrences since + /// the last emission and the timestamp of the first one. + fn record_rejected_submit(&mut self, submit_duration_ms: f64, outbound_len: usize) { + let now = Instant::now(); + self.rejected_count += 1; + self.rejected_first_at.get_or_insert_with(SystemTime::now); + + let due = self.rejected_last_emitted.map_or(true, |t| { + now.duration_since(t) >= REJECTED_DEBOUNCE_INTERVAL + }); + if !due { + return; } + + let first_at = self.rejected_first_at.map(|t| DateTime::::from(t)); + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}, occurrences: {:?}, first_rejected_at: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + "message_rejected", + self.rejected_count, + first_at, + ); + metrics::counter!(METRIC_BATCH_SUBMIT_REJECTED, &self.step_labels) + .increment(self.rejected_count); + + self.rejected_count = 0; + self.rejected_first_at = None; + self.rejected_last_emitted = Some(now); } /// Tries to drain the queue containing the pending messages. @@ -262,18 +313,27 @@ impl BatchStep { let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels) .set(submit_duration_ms); - let submit_outcome = match &submit_result { - Ok(()) => "ok", - Err(SubmitError::MessageRejected(_)) => "message_rejected", - Err(SubmitError::InvalidMessage(_)) => "invalid_message", - }; - log::debug!( - "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", - self.step_name, - submit_duration_ms, - outbound_len, - submit_outcome - ); + match &submit_result { + // MessageRejected happens on every retry of a stalled submit, i.e. in a tight + // loop, so it is debounced separately rather than logged here. + Err(SubmitError::MessageRejected(_)) => { + self.record_rejected_submit(submit_duration_ms, outbound_len) + } + other => { + let submit_outcome = match other { + Ok(()) => "ok", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + Err(SubmitError::MessageRejected(_)) => unreachable!(), + }; + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + submit_outcome + ); + } + } match submit_result { Ok(()) => { if self.pending_batch { @@ -857,4 +917,139 @@ mod tests { }); } } + + mod rejected_debounce { + //! [`BatchStep::record_rejected_submit`]: the debounced log/metric emitted + //! for `MessageRejected` outcomes while draining the outbound queue. + + use super::super::{BatchStep, METRIC_BATCH_SUBMIT_REJECTED, REJECTED_DEBOUNCE_INTERVAL}; + use crate::fake_strategy::FakeStrategy; + use crate::routes::Route; + use metrics::{ + Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit, + }; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; + + /// Minimal `metrics` recorder that captures counter increments. Only the + /// counter path is exercised by `record_rejected_submit`. + struct CounterRecorder { + counters: Arc>>, + } + + impl Recorder for CounterRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter { + Counter::from_arc(Arc::new(CaptureCounter { + key: key.clone(), + counters: Arc::clone(&self.counters), + })) + } + fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge { + Gauge::noop() + } + fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram { + Histogram::noop() + } + } + + struct CaptureCounter { + key: Key, + counters: Arc>>, + } + + impl metrics::CounterFn for CaptureCounter { + fn increment(&self, value: u64) { + self.counters + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + fn absolute(&self, _: u64) {} + } + + fn build_step() -> BatchStep { + let sub = Arc::new(Mutex::new(Vec::new())); + let wms = Arc::new(Mutex::new(Vec::new())); + let next = FakeStrategy::new(sub, wms, false); + BatchStep::new( + Route::new("s".into(), vec!["w".into()]), + None, + None, + "test_batch".to_string(), + Box::new(next), + ) + } + + /// All values emitted to the rejected-submit counter, in order. + fn rejected_emissions(recorded: &[(Key, u64)]) -> Vec { + recorded + .iter() + .filter(|(k, _)| k.name() == METRIC_BATCH_SUBMIT_REJECTED) + .map(|(_, v)| *v) + .collect() + } + + #[test] + fn first_rejection_emits_metric_immediately() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + step.record_rejected_submit(1.0, 3); + + assert_eq!(rejected_emissions(&counters.lock().unwrap()), vec![1]); + } + + #[test] + fn rejections_within_interval_are_debounced() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + // First emits immediately; the rest fall inside the debounce window. + step.record_rejected_submit(1.0, 0); + step.record_rejected_submit(1.0, 0); + step.record_rejected_submit(1.0, 0); + + assert_eq!( + rejected_emissions(&counters.lock().unwrap()), + vec![1], + "only the first rejection should emit within the interval" + ); + } + + #[test] + fn emits_accumulated_count_after_interval() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + step.record_rejected_submit(1.0, 0); // emits 1, resets the window + step.record_rejected_submit(1.0, 0); // accumulates (pending = 1) + step.record_rejected_submit(1.0, 0); // accumulates (pending = 2) + + // Pretend the debounce window has elapsed since the last emission. + step.rejected_last_emitted = + Some(Instant::now() - REJECTED_DEBOUNCE_INTERVAL - Duration::from_secs(1)); + step.record_rejected_submit(1.0, 0); // now due: emits the accumulated 3 + + assert_eq!( + rejected_emissions(&counters.lock().unwrap()), + vec![1, 3], + "immediate first emit, then the accumulated count once the interval passes" + ); + } + } } diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index b0a2de23..ee30a143 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -29,6 +29,49 @@ const METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS: &str = import_exception!(arroyo.processing.strategies, MessageRejected); import_exception!(arroyo.dlq, InvalidMessage); +/// How often the per-submit debug logs may be written. These fire on every +/// message (and on every retry under backpressure), so they are debounced. +const SUBMIT_LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(5); + +/// A `log::debug!` line emitted at most once per `interval`, hiding the +/// debouncing from the call site. +/// +/// The message is built lazily (only when the line is actually written, so the +/// format cost is paid once per `interval` rather than on every call) and the +/// number of occurrences suppressed since the last emission is appended +/// automatically. +struct DebouncedLogger { + interval: Duration, + count: u64, + last_emitted: Option, +} + +impl DebouncedLogger { + fn new(interval: Duration) -> Self { + Self { + interval, + count: 0, + last_emitted: None, + } + } + + /// Records one occurrence and writes the message if the debounce window has + /// elapsed. `message` is only invoked when the line is emitted. + fn debug String>(&mut self, message: F) { + let now = Instant::now(); + self.count += 1; + let due = self + .last_emitted + .map_or(true, |t| now.duration_since(t) >= self.interval); + if !due { + return; + } + log::debug!("{} occurrences: {:?}", message(), self.count); + self.count = 0; + self.last_emitted = Some(now); + } +} + /// PythonAdapter is an Arroyo processing strategy that delegates the /// processing of messages to a Python class that extends the /// `RustOperatorDelegate` class. @@ -48,6 +91,11 @@ pub struct PythonAdapter { next_strategy: Box>, commit_request_carried_over: Option, step_labels: Vec<(String, String)>, + /// Debounces the per-submit log in [`submit`](Self::submit). + submit_logger: DebouncedLogger, + /// Debounces the per-submit log when forwarding to the next step in + /// [`poll`](Self::poll). + next_step_submit_logger: DebouncedLogger, } impl PythonAdapter { @@ -67,6 +115,8 @@ impl PythonAdapter { transformed_messages: VecDeque::new(), commit_request_carried_over: None, step_labels, + submit_logger: DebouncedLogger::new(SUBMIT_LOG_DEBOUNCE_INTERVAL), + next_step_submit_logger: DebouncedLogger::new(SUBMIT_LOG_DEBOUNCE_INTERVAL), } }) } @@ -156,20 +206,34 @@ impl ProcessingStrategy for PythonAdapter { let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; metrics::gauge!(METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS, &self.step_labels) .set(submit_duration_ms); - log::debug!( - "PythonAdapter submit. duration_ms: {:?}", - submit_duration_ms - ); let Err(py_err) = res else { + log::debug!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, + "ok" + ); return Ok(()); }; if py_err.is_instance(py, &py.get_type::()) { + // MessageRejected is retried in a tight backpressure loop, so this + // path is the only one worth debouncing. + self.submit_logger.debug(|| { + format!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, "message_rejected" + ) + }); Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message }, )) } else if py_err.is_instance(py, &py.get_type::()) { + log::debug!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, + "invalid_message" + ); let val = py_err.value(py); let offset: u64 = val .getattr("offset") @@ -242,17 +306,31 @@ impl ProcessingStrategy for PythonAdapter { &self.step_labels ) .set(submit_duration_ms); - let submit_outcome = match &submit_result { - Ok(()) => "ok", - Err(SubmitError::MessageRejected(_)) => "message_rejected", - Err(SubmitError::InvalidMessage(_)) => "invalid_message", - }; - log::debug!( - "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", - submit_duration_ms, - pending_len, - submit_outcome - ); + match &submit_result { + // MessageRejected is retried in a tight backpressure loop, so this + // path is the only one worth debouncing. + Err(SubmitError::MessageRejected(_)) => { + self.next_step_submit_logger.debug(|| { + format!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, pending_len, "message_rejected" + ) + }); + } + other => { + let submit_outcome = match other { + Ok(()) => "ok", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + Err(SubmitError::MessageRejected(_)) => unreachable!(), + }; + log::debug!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, + pending_len, + submit_outcome + ); + } + } match submit_result { Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected {