diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py new file mode 100644 index 0000000000..6f3f11c2f3 --- /dev/null +++ b/sentry_sdk/_span_batcher.py @@ -0,0 +1,118 @@ +import threading +from collections import defaultdict +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from sentry_sdk._batcher import Batcher +from sentry_sdk.consts import SPANSTATUS +from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr + +if TYPE_CHECKING: + from typing import Any, Callable, Optional + from sentry_sdk.traces import StreamedSpan + from sentry_sdk._types import SerializedAttributeValue + + +class SpanBatcher(Batcher["StreamedSpan"]): + # TODO[span-first]: size-based flushes + # TODO[span-first]: adjust flush/drop defaults + MAX_BEFORE_FLUSH = 1000 + MAX_BEFORE_DROP = 5000 + FLUSH_WAIT_TIME = 5.0 + + TYPE = "span" + CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json" + + def __init__( + self, + capture_func: "Callable[[Envelope], None]", + record_lost_func: "Callable[..., None]", + ) -> None: + # Spans from different traces cannot be emitted in the same envelope + # since the envelope contains a shared trace header. That's why we bucket + # by trace_id, so that we can then send the buckets each in its own + # envelope. + # trace_id -> span buffer + self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + + self._flush_event: "threading.Event" = threading.Event() + + self._flusher: "Optional[threading.Thread]" = None + self._flusher_pid: "Optional[int]" = None + + def get_size(self) -> int: + # caller is responsible for locking before checking this + return sum(len(buffer) for buffer in self._span_buffer.values()) + + def add(self, span: "StreamedSpan") -> None: + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + size = self.get_size() + if size >= self.MAX_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + if size + 1 >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + + @staticmethod + def _to_transport_format(item: "StreamedSpan") -> "Any": + # TODO[span-first] + res: "dict[str, Any]" = {} + return res + + def _flush(self) -> None: + with self._lock: + if len(self._span_buffer) == 0: + return None + + envelopes = [] + for trace_id, spans in self._span_buffer.items(): + if spans: + # TODO[span-first] + # dsc = spans[0].dynamic_sampling_context() + dsc = None + + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type="span", + content_type="application/vnd.sentry.items.span.v2+json", + headers={ + "item_count": len(spans), + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(span) + for span in spans + ] + } + ), + ) + ) + + envelopes.append(envelope) + + self._span_buffer.clear() + + for envelope in envelopes: + self._capture_func(envelope) diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index fb14d8e36a..c0681fa0a6 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -11,6 +11,7 @@ import sentry_sdk from sentry_sdk._compat import PY37, check_uwsgi_thread_support from sentry_sdk._metrics_batcher import MetricsBatcher +from sentry_sdk._span_batcher import SpanBatcher from sentry_sdk.utils import ( AnnotatedValue, ContextVar, @@ -31,6 +32,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.transport import BaseHttpTransport, make_transport from sentry_sdk.consts import ( SPANDATA, @@ -188,6 +190,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None: self.monitor: "Optional[Monitor]" = None self.log_batcher: "Optional[LogBatcher]" = None self.metrics_batcher: "Optional[MetricsBatcher]" = None + self.span_batcher: "Optional[SpanBatcher]" = None self.integrations: "dict[str, Integration]" = {} def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any": @@ -399,6 +402,13 @@ def _record_lost_event( record_lost_func=_record_lost_event, ) + self.span_batcher = None + if has_span_streaming_enabled(self.options): + self.span_batcher = SpanBatcher( + capture_func=_capture_envelope, + record_lost_func=_record_lost_event, + ) + max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: raise ValueError( @@ -909,7 +919,10 @@ def capture_event( return return_value def _capture_telemetry( - self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope" + self, + telemetry: "Optional[Union[Log, Metric]]", + ty: str, + scope: "Scope", ) -> None: # Capture attributes-based telemetry (logs, metrics, spansV2) if telemetry is None: @@ -993,6 +1006,8 @@ def close( self.log_batcher.kill() if self.metrics_batcher is not None: self.metrics_batcher.kill() + if self.span_batcher is not None: + self.span_batcher.kill() if self.monitor: self.monitor.kill() self.transport.kill() @@ -1018,6 +1033,8 @@ def flush( self.log_batcher.flush() if self.metrics_batcher is not None: self.metrics_batcher.flush() + if self.span_batcher is not None: + self.span_batcher.flush() self.transport.flush(timeout=timeout, callback=callback) def __enter__(self) -> "_Client": diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index f794754b05..f294414edf 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -82,6 +82,7 @@ class CompressionAlgo(Enum): "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], + "trace_lifecycle": Optional[Literal["static", "stream"]], }, total=False, ) diff --git a/sentry_sdk/traces.py b/sentry_sdk/traces.py new file mode 100644 index 0000000000..d2f6549e83 --- /dev/null +++ b/sentry_sdk/traces.py @@ -0,0 +1,39 @@ +""" +The API in this file is only meant to be used in span streaming mode. + +You can enable span streaming mode via +sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}). +""" + +import uuid +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Optional + + +class StreamedSpan: + """ + A span holds timing information of a block of code. + + Spans can have multiple child spans thus forming a span tree. + + This is the Span First span implementation. The original transaction-based + span implementation lives in tracing.Span. + """ + + __slots__ = ("_trace_id",) + + def __init__( + self, + *, + trace_id: "Optional[str]" = None, + ): + self._trace_id = trace_id + + @property + def trace_id(self) -> str: + if not self._trace_id: + self._trace_id = uuid.uuid4().hex + + return self._trace_id diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 742582423b..c1d6c44535 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -106,6 +106,13 @@ def has_tracing_enabled(options: "Optional[Dict[str, Any]]") -> bool: ) +def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool: + if options is None: + return False + + return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" + + @contextlib.contextmanager def record_sql_queries( cursor: "Any",