diff --git a/sentry_streams/src/batch_step.rs b/sentry_streams/src/batch_step.rs index 28e51e20..f3e8cdbd 100644 --- a/sentry_streams/src/batch_step.rs +++ b/sentry_streams/src/batch_step.rs @@ -10,6 +10,7 @@ use crate::pipeline_stats::get_stats; use crate::routes::{Route, RoutedValue}; use crate::time_helpers::current_epoch; use crate::utils::traced_with_gil; +use chrono::{DateTime, Utc}; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyList}; use sentry_arroyo::processing::strategies::{ @@ -24,6 +25,12 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const METRIC_BATCH_SIZE: &str = "streams.pipeline.batch.size"; const METRIC_BATCH_TIME_MS: &str = "streams.pipeline.batch.time_ms"; const METRIC_BATCH_SUBMIT_DURATION_MS: &str = "streams.pipeline.batch.submit_duration_ms"; +const METRIC_BATCH_SUBMIT_REJECTED: &str = "streams.pipeline.batch.submit_rejected"; + +/// How often the debounced "MessageRejected" log line and counter metric may be +/// emitted. Submits are retried in a tight loop under backpressure, so we +/// aggregate rejections instead of emitting one per attempt. +const REJECTED_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(3); fn first_element_schema(py: Python<'_>, first: &PyStreamingMessage) -> Option { match first { @@ -219,6 +226,12 @@ pub struct BatchStep { pending_batch: bool, commit_request_carried_over: Option, step_labels: Vec<(String, String)>, + /// Rejections accumulated since the last debounced emission. + rejected_count: u64, + /// Wall-clock time of the first rejection in the current (un-emitted) window. + rejected_first_at: Option, + /// Monotonic time of the last debounced emission. + rejected_last_emitted: Option, } impl BatchStep { @@ -242,7 +255,45 @@ impl BatchStep { pending_batch: false, commit_request_carried_over: None, step_labels, + rejected_count: 0, + rejected_first_at: None, + rejected_last_emitted: None, + } + } + + /// Records a `MessageRejected` outcome during outbound draining. To avoid + /// flooding the logs and metrics in the tight backpressure retry loop, the + /// log line and counter metric are emitted at most once per + /// [`REJECTED_DEBOUNCE_INTERVAL`], carrying the number of occurrences since + /// the last emission and the timestamp of the first one. + fn record_rejected_submit(&mut self, submit_duration_ms: f64, outbound_len: usize) { + let now = Instant::now(); + self.rejected_count += 1; + self.rejected_first_at.get_or_insert_with(SystemTime::now); + + let due = self.rejected_last_emitted.map_or(true, |t| { + now.duration_since(t) >= REJECTED_DEBOUNCE_INTERVAL + }); + if !due { + return; } + + let first_at = self.rejected_first_at.map(|t| DateTime::::from(t)); + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}, occurrences: {:?}, first_rejected_at: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + "message_rejected", + self.rejected_count, + first_at, + ); + metrics::counter!(METRIC_BATCH_SUBMIT_REJECTED, &self.step_labels) + .increment(self.rejected_count); + + self.rejected_count = 0; + self.rejected_first_at = None; + self.rejected_last_emitted = Some(now); } /// Tries to drain the queue containing the pending messages. @@ -262,18 +313,27 @@ impl BatchStep { let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; metrics::gauge!(METRIC_BATCH_SUBMIT_DURATION_MS, &self.step_labels) .set(submit_duration_ms); - let submit_outcome = match &submit_result { - Ok(()) => "ok", - Err(SubmitError::MessageRejected(_)) => "message_rejected", - Err(SubmitError::InvalidMessage(_)) => "invalid_message", - }; - log::debug!( - "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", - self.step_name, - submit_duration_ms, - outbound_len, - submit_outcome - ); + match &submit_result { + // MessageRejected happens on every retry of a stalled submit, i.e. in a tight + // loop, so it is debounced separately rather than logged here. + Err(SubmitError::MessageRejected(_)) => { + self.record_rejected_submit(submit_duration_ms, outbound_len) + } + other => { + let submit_outcome = match other { + Ok(()) => "ok", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + Err(SubmitError::MessageRejected(_)) => unreachable!(), + }; + log::debug!( + "BatchStep drain submit. step: {:?}, duration_ms: {:?}, outbound_len: {:?}, outcome: {:?}", + self.step_name, + submit_duration_ms, + outbound_len, + submit_outcome + ); + } + } match submit_result { Ok(()) => { if self.pending_batch { @@ -857,4 +917,139 @@ mod tests { }); } } + + mod rejected_debounce { + //! [`BatchStep::record_rejected_submit`]: the debounced log/metric emitted + //! for `MessageRejected` outcomes while draining the outbound queue. + + use super::super::{BatchStep, METRIC_BATCH_SUBMIT_REJECTED, REJECTED_DEBOUNCE_INTERVAL}; + use crate::fake_strategy::FakeStrategy; + use crate::routes::Route; + use metrics::{ + Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit, + }; + use std::sync::{Arc, Mutex}; + use std::time::{Duration, Instant}; + + /// Minimal `metrics` recorder that captures counter increments. Only the + /// counter path is exercised by `record_rejected_submit`. + struct CounterRecorder { + counters: Arc>>, + } + + impl Recorder for CounterRecorder { + fn describe_counter(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_gauge(&self, _: KeyName, _: Option, _: SharedString) {} + fn describe_histogram(&self, _: KeyName, _: Option, _: SharedString) {} + fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter { + Counter::from_arc(Arc::new(CaptureCounter { + key: key.clone(), + counters: Arc::clone(&self.counters), + })) + } + fn register_gauge(&self, _: &Key, _: &Metadata<'_>) -> Gauge { + Gauge::noop() + } + fn register_histogram(&self, _: &Key, _: &Metadata<'_>) -> Histogram { + Histogram::noop() + } + } + + struct CaptureCounter { + key: Key, + counters: Arc>>, + } + + impl metrics::CounterFn for CaptureCounter { + fn increment(&self, value: u64) { + self.counters + .lock() + .unwrap() + .push((self.key.clone(), value)); + } + fn absolute(&self, _: u64) {} + } + + fn build_step() -> BatchStep { + let sub = Arc::new(Mutex::new(Vec::new())); + let wms = Arc::new(Mutex::new(Vec::new())); + let next = FakeStrategy::new(sub, wms, false); + BatchStep::new( + Route::new("s".into(), vec!["w".into()]), + None, + None, + "test_batch".to_string(), + Box::new(next), + ) + } + + /// All values emitted to the rejected-submit counter, in order. + fn rejected_emissions(recorded: &[(Key, u64)]) -> Vec { + recorded + .iter() + .filter(|(k, _)| k.name() == METRIC_BATCH_SUBMIT_REJECTED) + .map(|(_, v)| *v) + .collect() + } + + #[test] + fn first_rejection_emits_metric_immediately() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + step.record_rejected_submit(1.0, 3); + + assert_eq!(rejected_emissions(&counters.lock().unwrap()), vec![1]); + } + + #[test] + fn rejections_within_interval_are_debounced() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + // First emits immediately; the rest fall inside the debounce window. + step.record_rejected_submit(1.0, 0); + step.record_rejected_submit(1.0, 0); + step.record_rejected_submit(1.0, 0); + + assert_eq!( + rejected_emissions(&counters.lock().unwrap()), + vec![1], + "only the first rejection should emit within the interval" + ); + } + + #[test] + fn emits_accumulated_count_after_interval() { + let counters = Arc::new(Mutex::new(Vec::new())); + let recorder = CounterRecorder { + counters: Arc::clone(&counters), + }; + let _guard = metrics::set_default_local_recorder(&recorder); + + let mut step = build_step(); + step.record_rejected_submit(1.0, 0); // emits 1, resets the window + step.record_rejected_submit(1.0, 0); // accumulates (pending = 1) + step.record_rejected_submit(1.0, 0); // accumulates (pending = 2) + + // Pretend the debounce window has elapsed since the last emission. + step.rejected_last_emitted = + Some(Instant::now() - REJECTED_DEBOUNCE_INTERVAL - Duration::from_secs(1)); + step.record_rejected_submit(1.0, 0); // now due: emits the accumulated 3 + + assert_eq!( + rejected_emissions(&counters.lock().unwrap()), + vec![1, 3], + "immediate first emit, then the accumulated count once the interval passes" + ); + } + } } diff --git a/sentry_streams/src/python_operator.rs b/sentry_streams/src/python_operator.rs index b0a2de23..ee30a143 100644 --- a/sentry_streams/src/python_operator.rs +++ b/sentry_streams/src/python_operator.rs @@ -29,6 +29,49 @@ const METRIC_PYTHON_ADAPTER_NEXT_STEP_SUBMIT_DURATION_MS: &str = import_exception!(arroyo.processing.strategies, MessageRejected); import_exception!(arroyo.dlq, InvalidMessage); +/// How often the per-submit debug logs may be written. These fire on every +/// message (and on every retry under backpressure), so they are debounced. +const SUBMIT_LOG_DEBOUNCE_INTERVAL: Duration = Duration::from_secs(5); + +/// A `log::debug!` line emitted at most once per `interval`, hiding the +/// debouncing from the call site. +/// +/// The message is built lazily (only when the line is actually written, so the +/// format cost is paid once per `interval` rather than on every call) and the +/// number of occurrences suppressed since the last emission is appended +/// automatically. +struct DebouncedLogger { + interval: Duration, + count: u64, + last_emitted: Option, +} + +impl DebouncedLogger { + fn new(interval: Duration) -> Self { + Self { + interval, + count: 0, + last_emitted: None, + } + } + + /// Records one occurrence and writes the message if the debounce window has + /// elapsed. `message` is only invoked when the line is emitted. + fn debug String>(&mut self, message: F) { + let now = Instant::now(); + self.count += 1; + let due = self + .last_emitted + .map_or(true, |t| now.duration_since(t) >= self.interval); + if !due { + return; + } + log::debug!("{} occurrences: {:?}", message(), self.count); + self.count = 0; + self.last_emitted = Some(now); + } +} + /// PythonAdapter is an Arroyo processing strategy that delegates the /// processing of messages to a Python class that extends the /// `RustOperatorDelegate` class. @@ -48,6 +91,11 @@ pub struct PythonAdapter { next_strategy: Box>, commit_request_carried_over: Option, step_labels: Vec<(String, String)>, + /// Debounces the per-submit log in [`submit`](Self::submit). + submit_logger: DebouncedLogger, + /// Debounces the per-submit log when forwarding to the next step in + /// [`poll`](Self::poll). + next_step_submit_logger: DebouncedLogger, } impl PythonAdapter { @@ -67,6 +115,8 @@ impl PythonAdapter { transformed_messages: VecDeque::new(), commit_request_carried_over: None, step_labels, + submit_logger: DebouncedLogger::new(SUBMIT_LOG_DEBOUNCE_INTERVAL), + next_step_submit_logger: DebouncedLogger::new(SUBMIT_LOG_DEBOUNCE_INTERVAL), } }) } @@ -156,20 +206,34 @@ impl ProcessingStrategy for PythonAdapter { let submit_duration_ms = submit_start.elapsed().as_secs_f64() * 1000.0; metrics::gauge!(METRIC_PYTHON_ADAPTER_SUBMIT_DURATION_MS, &self.step_labels) .set(submit_duration_ms); - log::debug!( - "PythonAdapter submit. duration_ms: {:?}", - submit_duration_ms - ); let Err(py_err) = res else { + log::debug!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, + "ok" + ); return Ok(()); }; if py_err.is_instance(py, &py.get_type::()) { + // MessageRejected is retried in a tight backpressure loop, so this + // path is the only one worth debouncing. + self.submit_logger.debug(|| { + format!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, "message_rejected" + ) + }); Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected { message }, )) } else if py_err.is_instance(py, &py.get_type::()) { + log::debug!( + "PythonAdapter submit. duration_ms: {:?}, outcome: {:?}", + submit_duration_ms, + "invalid_message" + ); let val = py_err.value(py); let offset: u64 = val .getattr("offset") @@ -242,17 +306,31 @@ impl ProcessingStrategy for PythonAdapter { &self.step_labels ) .set(submit_duration_ms); - let submit_outcome = match &submit_result { - Ok(()) => "ok", - Err(SubmitError::MessageRejected(_)) => "message_rejected", - Err(SubmitError::InvalidMessage(_)) => "invalid_message", - }; - log::debug!( - "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", - submit_duration_ms, - pending_len, - submit_outcome - ); + match &submit_result { + // MessageRejected is retried in a tight backpressure loop, so this + // path is the only one worth debouncing. + Err(SubmitError::MessageRejected(_)) => { + self.next_step_submit_logger.debug(|| { + format!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, pending_len, "message_rejected" + ) + }); + } + other => { + let submit_outcome = match other { + Ok(()) => "ok", + Err(SubmitError::InvalidMessage(_)) => "invalid_message", + Err(SubmitError::MessageRejected(_)) => unreachable!(), + }; + log::debug!( + "PythonAdapter next_step submit. duration_ms: {:?}, pending_len: {:?}, outcome: {:?}", + submit_duration_ms, + pending_len, + submit_outcome + ); + } + } match submit_result { Err(SubmitError::MessageRejected( sentry_arroyo::processing::strategies::MessageRejected {