Skip to content

Add fast headers filter#290

Open
fpacifici wants to merge 2 commits intomainfrom
fpacifici/add_fast_filter
Open

Add fast headers filter#290
fpacifici wants to merge 2 commits intomainfrom
fpacifici/add_fast_filter

Conversation

@fpacifici
Copy link
Copy Markdown
Collaborator

I tested the headers filter used in the SBC implementation.
It seems that filter is terribly inefficient. This seemingly is due to the python/rust
interaction when providing the headers to the python code.
We seem to be spending most of the time here https://github.com/getsentry/streams/blob/main/sentry_streams/src/messages.rs#L47-L82

I want to try SBC with a native rust implementation of the header filter.
This is going to be used here https://github.com/getsentry/super-big-consumers/blob/master/consumer/streaming_platform/pipelines/items_span.py#L28
where the filtering logic is trivial (check that the value of a header equals an integer).

The goal is to see if the spans pipeline does not have that bottleneck anymore

@fpacifici fpacifici requested a review from a team as a code owner April 1, 2026 14:13
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 1, 2026

Semver Impact of This PR

None (no version bump detected)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


Internal Changes 🔧

Deps

  • Bump rustls-webpki from 0.103.3 to 0.103.10 in /sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms by dependabot in #278
  • Bump slab from 0.4.10 to 0.4.12 in /sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms by dependabot in #281
  • Bump rustls-webpki from 0.103.3 to 0.103.10 in /sentry_streams/tests/rust_test_functions by dependabot in #283

Other

  • Add fast headers filter by fpacifici in #290

🤖 This preview updates automatically when you update the PR.

raise NotImplementedError

def filter(self, step: Filter[Any], stream: Route) -> Route:
def filter(self, step: Filter[Any] | HeaderIntFilter[Any], stream: Route) -> Route:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a bit of refactoring. We do not want To have a large Union type for all filter types.

Let's refactor it this way:

  • Filter becomes an abstract class that only defines step_type: StepType = StepType.FILTER.
  • FunctionFilter extends Filter replaces what today is Filter and specifies the function to execute
  • HeadersFilter extends Filter and is the HeaderIntFilter
  • This method takes a Filter object and perfoms the instanceof inside to decide which implementation to use.

Comment on lines +209 to +212
assert stream is not None
assert isinstance(
step, (Filter, HeaderIntFilter)
), f"Expected Filter or HeaderIntFilter, got {type(step)}"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF we perform the refactoring defined here https://github.com/getsentry/streams/pull/290/changes#r3022430003, this is not needed.



@dataclass
class HeaderIntFilter(Transform[TIn, TIn], Generic[TIn]):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the refactoring explained here https://github.com/getsentry/streams/pull/290/changes#r3022430003

Comment on lines +37 to +42
fn streaming_message_headers(msg: &PyStreamingMessage) -> Vec<(String, Vec<u8>)> {
traced_with_gil!(|py| match msg {
PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().headers.clone(),
PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().headers.clone(),
})
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact we have to take the gil here is annoying, but there is nothing we can do about it .

…lter

Introduce an abstract Filter step with PredicateFilter (callable predicate) and HeadersFilter (integer Kafka header equality) subclasses. Wire HeadersFilter to RuntimeOperator.HeaderIntFilter in rust_arroyo; the pure Python Arroyo adapter raises NotImplementedError for that path.

Propagate join() results from Rust filter strategies so downstream commit requests are not dropped on shutdown.

Add mypy import-not-found suppressions for optional metrics_rust_transforms and rust_test_functions extension modules used by examples and integration tests.

Co-Authored-By: Cursor <cursoragent@cursor.com>
Made-with: Cursor
Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a fair test to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants