Skip to content

feat(streams): Add backpressure metrics for consumer strategies#288

Open
fpacifici wants to merge 4 commits intomainfrom
fpacifici/add_backpressure
Open

feat(streams): Add backpressure metrics for consumer strategies#288
fpacifici wants to merge 4 commits intomainfrom
fpacifici/add_backpressure

Conversation

@fpacifici
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici commented Mar 30, 2026

Rust arroyo does not record metrics for backpressure unless the exception is propagated back to the consumer.
This PR adds metrics to record backpressure events no matter where they happen together with a metrics that tracks how long a step is exerting backpressure for.

This is the first step to improve the throughput metrics.

  1. Refactor how metrics are initialized. The initializer we have is for arroyo metrics. We were not recording anything from the rust code. Added unit tests as well.
  2. Added a BackpressureTracker that keeps track of the backpressure state of every step and wraps an Arroyo strategy to intercept backrpessure events.
  3. Allow the python code to specify the name of a step to the rust code (used to populate tags).

What would come next:

  • buffered metrics to impact performance less
  • success rate to compute the rate between success and backpressure
  • more metrics recorders (log based to record periodically stats)

Made with Cursor

Instrument ProcessingStrategy steps with counters and episode histograms for
MessageRejected: send_backpressure vs receive_backpressure and matching
duration series, using a BackpressureNext wrapper and explicit tracking
for StreamSink produce and PythonAdapter.

Each metric is labeled by step (operator kind and route). Incomplete
episodes at shutdown are not emitted.

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 30, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (streams) Add backpressure metrics for consumer strategies by fpacifici in #288

Internal Changes 🔧

Deps

  • Bump pygments from 2.19.2 to 2.20.0 in /sentry_streams by dependabot in #286
  • Bump requests from 2.32.4 to 2.33.0 in /sentry_streams by dependabot in #287

🤖 This preview updates automatically when you update the PR.

fpacifici and others added 2 commits March 31, 2026 00:30
Use DSL Step.name for the step label: add_step(step, step_name) on the
Rust consumer, parallel step_names in build_chain, and wire names from
rust_arroyo (including segment_label for chained maps).

Rename EpisodeTracker to BackpressureTracker. Document BackpressureNext::poll
vs submit for MessageRejected.

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
Move pipeline step labels into RuntimeOperator so consumer assembly no longer
tracks a parallel step_names list. This lets Arroyo segment finalization pass a
single first-step label through operator construction for backpressure metrics.

Co-Authored-By: GPT-5 Codex <noreply@openai.com>
Made-with: Cursor
@fpacifici fpacifici force-pushed the fpacifici/add_backpressure branch from 7190cbe to 8f9c8ef Compare March 31, 2026 10:05
Route backpressure counters and duration histograms through STREAMS_RECORDER and shared streams.pipeline helpers. Restore DogStatsD global labels on the exporter.

Add unit tests using metrics with_local_recorder plus DebuggingRecorder, a merged StreamsMetricsRecorder test, and BackpressureNext wired to FakeStrategy for MessageRejected.

Co-Authored-By: GPT-5 Codex <noreply@openai.com>
Made-with: Cursor
@fpacifici fpacifici marked this pull request as ready for review March 31, 2026 10:50
@fpacifici fpacifici requested a review from a team as a code owner March 31, 2026 10:50
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Ok(_) => {}
Ok(_) => {
recv_on_success(label, &mut self.produce_recv_tracker);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing send_on_success in poll inflates backpressure duration

Medium Severity

When StreamSink::poll successfully retries the carried-over message, recv_on_success is called for produce_recv_tracker but send_on_success is never called for send_tracker. The send_tracker timer started when a prior submit returned MessageRejected (due to message_carried_over being Some). Once poll clears the carried-over message, the step can accept messages again, but the timer keeps running until the next successful submit call. This inflates send_backpressure_duration by the time between the poll clearing the backlog and the next incoming message, which can be significant in low-throughput scenarios. Adding send_on_success(label, &mut self.send_tracker) alongside recv_on_success in the Ok(_) branch of poll would fix the measurement.

Additional Locations (1)
Fix in Cursor Fix in Web

}

/// End a receive backpressure event after downstream accepted a submit.
pub fn recv_on_success(step: &str, tracker: &mut BackpressureTracker) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate function? Why not called the tracker directly in the BackpressureNext class?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants