Conversation
Semver Impact of This PR⚪ None (no version bump detected) 📋 Changelog PreviewThis is how your changes will appear in the changelog. Internal Changes 🔧Deps
Other
🤖 This preview updates automatically when you update the PR. |
| raise NotImplementedError | ||
|
|
||
| def filter(self, step: Filter[Any], stream: Route) -> Route: | ||
| def filter(self, step: Filter[Any] | HeaderIntFilter[Any], stream: Route) -> Route: |
There was a problem hiding this comment.
This requires a bit of refactoring. We do not want To have a large Union type for all filter types.
Let's refactor it this way:
- Filter becomes an abstract class that only defines
step_type: StepType = StepType.FILTER. - FunctionFilter extends Filter replaces what today is Filter and specifies the function to execute
- HeadersFilter extends Filter and is the
HeaderIntFilter - This method takes a
Filterobject and perfoms the instanceof inside to decide which implementation to use.
| assert stream is not None | ||
| assert isinstance( | ||
| step, (Filter, HeaderIntFilter) | ||
| ), f"Expected Filter or HeaderIntFilter, got {type(step)}" |
There was a problem hiding this comment.
IF we perform the refactoring defined here https://github.com/getsentry/streams/pull/290/changes#r3022430003, this is not needed.
|
|
||
|
|
||
| @dataclass | ||
| class HeaderIntFilter(Transform[TIn, TIn], Generic[TIn]): |
There was a problem hiding this comment.
Please see the refactoring explained here https://github.com/getsentry/streams/pull/290/changes#r3022430003
| fn streaming_message_headers(msg: &PyStreamingMessage) -> Vec<(String, Vec<u8>)> { | ||
| traced_with_gil!(|py| match msg { | ||
| PyStreamingMessage::PyAnyMessage { content } => content.bind(py).borrow().headers.clone(), | ||
| PyStreamingMessage::RawMessage { content } => content.bind(py).borrow().headers.clone(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
The fact we have to take the gil here is annoying, but there is nothing we can do about it .
…lter Introduce an abstract Filter step with PredicateFilter (callable predicate) and HeadersFilter (integer Kafka header equality) subclasses. Wire HeadersFilter to RuntimeOperator.HeaderIntFilter in rust_arroyo; the pure Python Arroyo adapter raises NotImplementedError for that path. Propagate join() results from Rust filter strategies so downstream commit requests are not dropped on shutdown. Add mypy import-not-found suppressions for optional metrics_rust_transforms and rust_test_functions extension modules used by examples and integration tests. Co-Authored-By: Cursor <cursoragent@cursor.com> Made-with: Cursor
I tested the headers filter used in the SBC implementation.
It seems that filter is terribly inefficient. This seemingly is due to the python/rust
interaction when providing the headers to the python code.
We seem to be spending most of the time here https://github.com/getsentry/streams/blob/main/sentry_streams/src/messages.rs#L47-L82
I want to try SBC with a native rust implementation of the header filter.
This is going to be used here https://github.com/getsentry/super-big-consumers/blob/master/consumer/streaming_platform/pipelines/items_span.py#L28
where the filtering logic is trivial (check that the value of a header equals an integer).
The goal is to see if the spans pipeline does not have that bottleneck anymore