-
Notifications
You must be signed in to change notification settings - Fork 580
feat(span-streaming): Add span batcher (2) #5398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
bab4359
37c990e
e720dec
6738f30
3fd25de
8c52a9c
65e667c
ad47675
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| import threading | ||
| from collections import defaultdict | ||
| from datetime import datetime, timezone | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| from sentry_sdk._batcher import Batcher | ||
| from sentry_sdk.consts import SPANSTATUS | ||
| from sentry_sdk.envelope import Envelope, Item, PayloadRef | ||
| from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Any, Callable, Optional | ||
| from sentry_sdk.traces import StreamedSpan | ||
| from sentry_sdk._types import SerializedAttributeValue | ||
|
|
||
|
|
||
| class SpanBatcher(Batcher["StreamedSpan"]): | ||
| # TODO[span-first]: size-based flushes | ||
| # TODO[span-first]: adjust flush/drop defaults | ||
| MAX_BEFORE_FLUSH = 1000 | ||
| MAX_BEFORE_DROP = 5000 | ||
| FLUSH_WAIT_TIME = 5.0 | ||
|
|
||
| TYPE = "span" | ||
| CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json" | ||
|
|
||
| def __init__( | ||
| self, | ||
| capture_func: "Callable[[Envelope], None]", | ||
| record_lost_func: "Callable[..., None]", | ||
| ) -> None: | ||
| # Spans from different traces cannot be emitted in the same envelope | ||
| # since the envelope contains a shared trace header. That's why we bucket | ||
| # by trace_id, so that we can then send the buckets each in its own | ||
| # envelope. | ||
| # trace_id -> span buffer | ||
| self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) | ||
| self._capture_func = capture_func | ||
| self._record_lost_func = record_lost_func | ||
| self._running = True | ||
| self._lock = threading.Lock() | ||
|
|
||
| self._flush_event: "threading.Event" = threading.Event() | ||
|
|
||
| self._flusher: "Optional[threading.Thread]" = None | ||
| self._flusher_pid: "Optional[int]" = None | ||
|
|
||
| def get_size(self) -> int: | ||
| # caller is responsible for locking before checking this | ||
| return sum(len(buffer) for buffer in self._span_buffer.values()) | ||
|
|
||
| def add(self, span: "StreamedSpan") -> None: | ||
| if not self._ensure_thread() or self._flusher is None: | ||
| return None | ||
|
|
||
| with self._lock: | ||
| size = self.get_size() | ||
| if size >= self.MAX_BEFORE_DROP: | ||
| self._record_lost_func( | ||
| reason="queue_overflow", | ||
| data_category="span", | ||
| quantity=1, | ||
| ) | ||
| return None | ||
|
|
||
| self._span_buffer[span.trace_id].append(span) | ||
| if size + 1 >= self.MAX_BEFORE_FLUSH: | ||
| self._flush_event.set() | ||
|
|
||
| @staticmethod | ||
| def _to_transport_format(item: "StreamedSpan") -> "Any": | ||
| # TODO[span-first] | ||
| res: "dict[str, Any]" = {} | ||
| return res | ||
|
|
||
| def _flush(self) -> None: | ||
| with self._lock: | ||
| if len(self._span_buffer) == 0: | ||
| return None | ||
|
|
||
| envelopes = [] | ||
| for trace_id, spans in self._span_buffer.items(): | ||
| if spans: | ||
| # TODO[span-first] | ||
| # dsc = spans[0].dynamic_sampling_context() | ||
| dsc = None | ||
|
|
||
| envelope = Envelope( | ||
| headers={ | ||
| "sent_at": format_timestamp(datetime.now(timezone.utc)), | ||
| "trace": dsc, | ||
| } | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Envelope header unconditionally includes null trace contextLow Severity The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All coming in future PRs |
||
|
|
||
| envelope.add_item( | ||
| Item( | ||
| type="span", | ||
| content_type="application/vnd.sentry.items.span.v2+json", | ||
| headers={ | ||
| "item_count": len(spans), | ||
| }, | ||
| payload=PayloadRef( | ||
| json={ | ||
| "items": [ | ||
| self._to_transport_format(span) | ||
| for span in spans | ||
| ] | ||
| } | ||
| ), | ||
| ) | ||
| ) | ||
|
|
||
| envelopes.append(envelope) | ||
|
|
||
| self._span_buffer.clear() | ||
|
|
||
| for envelope in envelopes: | ||
| self._capture_func(envelope) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| """ | ||
| The API in this file is only meant to be used in span streaming mode. | ||
|
|
||
| You can enable span streaming mode via | ||
| sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}). | ||
| """ | ||
|
|
||
| import uuid | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Optional | ||
|
|
||
|
|
||
| class StreamedSpan: | ||
| """ | ||
| A span holds timing information of a block of code. | ||
|
|
||
| Spans can have multiple child spans thus forming a span tree. | ||
|
|
||
| This is the Span First span implementation. The original transaction-based | ||
| span implementation lives in tracing.Span. | ||
| """ | ||
|
|
||
| __slots__ = ("_trace_id",) | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| trace_id: "Optional[str]" = None, | ||
| ): | ||
| self._trace_id = trace_id | ||
|
|
||
| @property | ||
| def trace_id(self) -> str: | ||
| if not self._trace_id: | ||
| self._trace_id = uuid.uuid4().hex | ||
|
|
||
| return self._trace_id |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can we not, or should we not, lock for the caller here?