feat(streams): Add backpressure metrics for consumer strategies#288
feat(streams): Add backpressure metrics for consumer strategies#288
Conversation
Instrument ProcessingStrategy steps with counters and episode histograms for MessageRejected: send_backpressure vs receive_backpressure and matching duration series, using a BackpressureNext wrapper and explicit tracking for StreamSink produce and PythonAdapter. Each metric is labeled by step (operator kind and route). Incomplete episodes at shutdown are not emitted. Co-Authored-By: Cursor <cursoragent@cursor.com> Made-with: Cursor
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Internal Changes 🔧Deps
🤖 This preview updates automatically when you update the PR. |
Use DSL Step.name for the step label: add_step(step, step_name) on the Rust consumer, parallel step_names in build_chain, and wire names from rust_arroyo (including segment_label for chained maps). Rename EpisodeTracker to BackpressureTracker. Document BackpressureNext::poll vs submit for MessageRejected. Co-Authored-By: Cursor <cursoragent@cursor.com> Made-with: Cursor
Move pipeline step labels into RuntimeOperator so consumer assembly no longer tracks a parallel step_names list. This lets Arroyo segment finalization pass a single first-step label through operator construction for backpressure metrics. Co-Authored-By: GPT-5 Codex <noreply@openai.com> Made-with: Cursor
7190cbe to
8f9c8ef
Compare
Route backpressure counters and duration histograms through STREAMS_RECORDER and shared streams.pipeline helpers. Restore DogStatsD global labels on the exporter. Add unit tests using metrics with_local_recorder plus DebuggingRecorder, a merged StreamsMetricsRecorder test, and BackpressureNext wired to FakeStrategy for MessageRejected. Co-Authored-By: GPT-5 Codex <noreply@openai.com> Made-with: Cursor
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| Ok(_) => {} | ||
| Ok(_) => { | ||
| recv_on_success(label, &mut self.produce_recv_tracker); | ||
| } |
There was a problem hiding this comment.
Missing send_on_success in poll inflates backpressure duration
Medium Severity
When StreamSink::poll successfully retries the carried-over message, recv_on_success is called for produce_recv_tracker but send_on_success is never called for send_tracker. The send_tracker timer started when a prior submit returned MessageRejected (due to message_carried_over being Some). Once poll clears the carried-over message, the step can accept messages again, but the timer keeps running until the next successful submit call. This inflates send_backpressure_duration by the time between the poll clearing the backlog and the next incoming message, which can be significant in low-throughput scenarios. Adding send_on_success(label, &mut self.send_tracker) alongside recv_on_success in the Ok(_) branch of poll would fix the measurement.
Additional Locations (1)
| } | ||
|
|
||
| /// End a receive backpressure event after downstream accepted a submit. | ||
| pub fn recv_on_success(step: &str, tracker: &mut BackpressureTracker) { |
There was a problem hiding this comment.
Why is this a separate function? Why not called the tracker directly in the BackpressureNext class?


Rust arroyo does not record metrics for backpressure unless the exception is propagated back to the consumer.
This PR adds metrics to record backpressure events no matter where they happen together with a metrics that tracks how long a step is exerting backpressure for.
This is the first step to improve the throughput metrics.
What would come next:
Made with Cursor