From 757ee12698ee149120ea67af2a48c2e9e2b177b6 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 15 Jan 2026 13:44:50 +0100 Subject: [PATCH 1/5] feat: Span streaming & new span API --- sentry_sdk/_span_batcher.py | 136 +++++++++++++++ sentry_sdk/_types.py | 22 +++ sentry_sdk/client.py | 24 ++- sentry_sdk/consts.py | 5 + sentry_sdk/envelope.py | 2 + sentry_sdk/scope.py | 38 +++++ sentry_sdk/trace.py | 322 ++++++++++++++++++++++++++++++++++++ sentry_sdk/tracing_utils.py | 7 + 8 files changed, 553 insertions(+), 3 deletions(-) create mode 100644 sentry_sdk/_span_batcher.py create mode 100644 sentry_sdk/trace.py diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py new file mode 100644 index 0000000000..35ed09148e --- /dev/null +++ b/sentry_sdk/_span_batcher.py @@ -0,0 +1,136 @@ +import threading +from collections import defaultdict +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from sentry_sdk._batcher import Batcher +from sentry_sdk.consts import SPANSTATUS +from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_sdk.utils import serialize_attribute, safe_repr + +if TYPE_CHECKING: + from typing import Any, Callable, Optional + from sentry_sdk.trace import Span + + +class SpanBatcher(Batcher["Span"]): + # TODO[span-first]: size-based flushes + MAX_BEFORE_FLUSH = 1000 + MAX_BEFORE_DROP = 5000 + FLUSH_WAIT_TIME = 5.0 + + TYPE = "span" + CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json" + + def __init__( + self, + capture_func: "Callable[[Envelope], None]", + record_lost_func: "Callable[..., None]", + ) -> None: + # Spans from different traces cannot be emitted in the same envelope + # since the envelope contains a shared trace header. That's why we bucket + # by trace_id, so that we can then send the buckets each in its own + # envelope. + # trace_id -> span buffer + self._span_buffer: dict[str, list["Span"]] = defaultdict(list) + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + + self._flush_event: "threading.Event" = threading.Event() + + self._flusher: "Optional[threading.Thread]" = None + self._flusher_pid: "Optional[int]" = None + + def get_size(self) -> int: + # caller is responsible for locking before checking this + return sum(len(buffer) for buffer in self._span_buffer.values()) + + def add(self, span: Span) -> None: + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + size = self.get_size() + if size >= self.MAX_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + if size + 1 >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + + @staticmethod + def _to_transport_format(item: "Span") -> "Any": + is_segment = item.containing_transaction == item + + res = { + "trace_id": item.trace_id, + "span_id": item.span_id, + "name": item.name if is_segment else item.description, + "status": SPANSTATUS.OK + if item.status == SPANSTATUS.OK + else SPANSTATUS.INTERNAL_ERROR, + "is_segment": is_segment, + "start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first] + "end_timestamp": item.timestamp.timestamp(), + } + + if item.parent_span_id: + res["parent_span_id"] = item.parent_span_id + + if item._attributes: + res["attributes"] = { + k: serialize_attribute(v) for (k, v) in item._attributes.items() + } + + return res + + def _flush(self): + # type: (...) -> Optional[Envelope] + from sentry_sdk.utils import format_timestamp + + with self._lock: + if len(self._span_buffer) == 0: + return None + + for trace_id, spans in self._span_buffer.items(): + if spans: + trace_context = spans[0].get_trace_context() + dsc = trace_context.get("dynamic_sampling_context") + # XXX[span-first]: empty dsc? + + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type="span", + content_type="application/vnd.sentry.items.span.v2+json", + headers={ + "item_count": len(spans), + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(span) + for span in spans + ] + } + ), + ) + ) + + self._span_buffer.clear() + + self._capture_func(envelope) + return envelope diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index 0ae3e653a7..5f17aaab11 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -1,5 +1,7 @@ from typing import TYPE_CHECKING, TypeVar, Union +from sentry_sdk.consts import SPANSTATUS + # Re-exported for compat, since code out there in the wild might use this variable. MYPY = TYPE_CHECKING @@ -274,6 +276,26 @@ class SDKInfo(TypedDict): MetricProcessor = Callable[[Metric, Hint], Optional[Metric]] + SpanV2Status = Literal[SPANSTATUS.OK, SPANSTATUS.ERROR] + # This is the V2 span format + # https://develop.sentry.dev/sdk/telemetry/spans/span-protocol/ + SpanV2 = TypedDict( + "SpanV2", + { + "trace_id": str, + "span_id": str, + "parent_span_id": Optional[str], + "name": str, + "status": SpanV2Status, + "is_segment": bool, + "start_timestamp": float, + "end_timestamp": float, + "attributes": Attributes, + }, + ) + + TraceLifecycleMode = Literal["static", "stream"] + # TODO: Make a proper type definition for this (PRs welcome!) Breadcrumb = Dict[str, Any] diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index fb14d8e36a..0896027db2 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -11,6 +11,7 @@ import sentry_sdk from sentry_sdk._compat import PY37, check_uwsgi_thread_support from sentry_sdk._metrics_batcher import MetricsBatcher +from sentry_sdk._span_batcher import SpanBatcher from sentry_sdk.utils import ( AnnotatedValue, ContextVar, @@ -31,6 +32,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.transport import BaseHttpTransport, make_transport from sentry_sdk.consts import ( SPANDATA, @@ -67,6 +69,7 @@ from sentry_sdk.scope import Scope from sentry_sdk.session import Session from sentry_sdk.spotlight import SpotlightClient + from sentry_sdk.trace import StreamedSpan from sentry_sdk.transport import Transport, Item from sentry_sdk._log_batcher import LogBatcher from sentry_sdk._metrics_batcher import MetricsBatcher @@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None: self.monitor: "Optional[Monitor]" = None self.log_batcher: "Optional[LogBatcher]" = None self.metrics_batcher: "Optional[MetricsBatcher]" = None + self.span_batcher: "Optional[SpanBatcher]" = None self.integrations: "dict[str, Integration]" = {} def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any": @@ -399,6 +403,12 @@ def _record_lost_event( record_lost_func=_record_lost_event, ) + self.span_batcher = None + if has_span_streaming_enabled(self.options): + self.span_batcher = SpanBatcher( + capture_func=_capture_envelope, record_lost_func=_record_lost_event + ) + max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: raise ValueError( @@ -909,7 +919,10 @@ def capture_event( return return_value def _capture_telemetry( - self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope" + self, + telemetry: "Optional[Union[Log, Metric, StreamedSpan]]", + ty: str, + scope: "Scope", ) -> None: # Capture attributes-based telemetry (logs, metrics, spansV2) if telemetry is None: @@ -921,7 +934,7 @@ def _capture_telemetry( if ty == "log": before_send = get_before_send_log(self.options) elif ty == "metric": - before_send = get_before_send_metric(self.options) # type: ignore + before_send = get_before_send_metric(self.options) if before_send is not None: telemetry = before_send(telemetry, {}) # type: ignore @@ -933,7 +946,9 @@ def _capture_telemetry( if ty == "log": batcher = self.log_batcher elif ty == "metric": - batcher = self.metrics_batcher # type: ignore + batcher = self.metrics_batcher + elif ty == "span": + batcher = self.span_batcher if batcher is not None: batcher.add(telemetry) # type: ignore @@ -944,6 +959,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None: def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None: self._capture_telemetry(metric, "metric", scope) + def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None: + self._capture_telemetry(span, "span", scope) + def capture_session( self, session: "Session", diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 59d3997c9a..cd1e8243e5 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -2,6 +2,8 @@ from enum import Enum from typing import TYPE_CHECKING +from sentry_sdk._types import TraceLifecycleMode + # up top to prevent circular import due to integration import # This is more or less an arbitrary large-ish value for now, so that we allow # pretty long strings (like LLM prompts), but still have *some* upper limit @@ -82,6 +84,7 @@ class CompressionAlgo(Enum): "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], + "trace_lifecycle": Optional[TraceLifecycleMode], }, total=False, ) @@ -877,6 +880,8 @@ class SPANSTATUS: UNIMPLEMENTED = "unimplemented" UNKNOWN_ERROR = "unknown_error" + ERROR = "error" # span-first specific + class OP: ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic" diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index 307fb26fd6..5e52c6196f 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -253,6 +253,8 @@ def data_category(self) -> "EventDataCategory": return "session" elif ty == "attachment": return "attachment" + elif ty == "span": + return "span" elif ty == "transaction": return "transaction" elif ty == "event": diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 6df26690c8..eee951b7f9 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -32,6 +32,7 @@ normalize_incoming_data, PropagationContext, ) +from sentry_sdk.trace import StreamedSpan from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, @@ -1147,6 +1148,40 @@ def start_span( return span + def start_streamed_span( + self, + name: str, + attributes: "Optional[Attributes]" = None, + parent_span: "Optional[StreamedSpan]" = None, + ) -> "StreamedSpan": + # TODO: rename to start_span once we drop the old API + with new_scope(): + if parent_span is None: + # get current span or transaction + parent_span = self.span or self.get_isolation_scope().span + + if parent_span is None: + # New spans get the `trace_id` from the scope + propagation_context = self.get_active_propagation_context() + span = StreamedSpan( + name=name, + attributes=attributes, + trace_id=propagation_context.trace_id, + scope=self, + ) + else: + # Children take propagation context from the parent span + span = StreamedSpan( + name=name, + attributes=attributes, + trace_id=parent_span.trace_id, + parent_span_id=parent_span.span_id, + segment=parent_span.segment, + scope=self, + ) + + return span + def continue_trace( self, environ_or_headers: "Dict[str, Any]", @@ -1180,6 +1215,9 @@ def continue_trace( **optional_kwargs, ) + def set_propagation_context(self, environ_or_headers: "dict[str, Any]") -> None: + self.generate_propagation_context(environ_or_headers) + def capture_event( self, event: "Event", diff --git a/sentry_sdk/trace.py b/sentry_sdk/trace.py new file mode 100644 index 0000000000..90bfe69205 --- /dev/null +++ b/sentry_sdk/trace.py @@ -0,0 +1,322 @@ +import uuid +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import TYPE_CHECKING + +import sentry_sdk +from sentry_sdk.consts import SPANDATA, SPANSTATUS +from sentry_sdk.profiler.continuous_profiler import get_profiler_id +from sentry_sdk.tracing import Span +from sentry_sdk.tracing_utils import has_span_streaming_enabled, has_tracing_enabled +from sentry_sdk.utils import ( + capture_internal_exceptions, + format_attribute, + get_current_thread_meta, + logger, + nanosecond_time, + should_be_treated_as_error, +) + +if TYPE_CHECKING: + from typing import Any, Optional, Union + from sentry_sdk._types import Attributes, AttributeValue + from sentry_sdk.scope import Scope + + +FLAGS_CAPACITY = 10 + +""" +TODO[span-first] / notes +- redis, http, subprocess breadcrumbs (maybe_create_breadcrumbs_from_span) work + on op, change or ignore? +- @trace +- tags +- initial status: OK? or unset? +- dropped spans are not migrated +- recheck transaction.finish <-> Streamedspan.end +- profile not part of the event, how to send? + +Notes: +- removed ability to provide a start_timestamp +- moved _flags_capacity to a const +""" + + +def start_span( + name: str, + attributes: "Optional[Attributes]" = None, + parent_span: "Optional[Span]" = None, +) -> Span: + return sentry_sdk.get_current_scope().start_streamed_span() + + +BAGGAGE_HEADER_NAME = "baggage" +SENTRY_TRACE_HEADER_NAME = "sentry-trace" + + +# Segment source, see +# https://getsentry.github.io/sentry-conventions/generated/attributes/sentry.html#sentryspansource +class SegmentSource(str, Enum): + COMPONENT = "component" + CUSTOM = "custom" + ROUTE = "route" + TASK = "task" + URL = "url" + VIEW = "view" + + def __str__(self) -> str: + return self.value + + +# These are typically high cardinality and the server hates them +LOW_QUALITY_SEGMENT_SOURCES = [ + SegmentSource.URL, +] + +SOURCE_FOR_STYLE = { + "endpoint": SegmentSource.COMPONENT, + "function_name": SegmentSource.COMPONENT, + "handler_name": SegmentSource.COMPONENT, + "method_and_path_pattern": SegmentSource.ROUTE, + "path": SegmentSource.URL, + "route_name": SegmentSource.COMPONENT, + "route_pattern": SegmentSource.ROUTE, + "uri_template": SegmentSource.ROUTE, + "url": SegmentSource.ROUTE, +} + + +class StreamedSpan: + """ + A span holds timing information of a block of code. + + Spans can have multiple child spans thus forming a span tree. + + This is the Span First span implementation. The original transaction-based + span implementation lives in tracing.Span. + """ + + __slots__ = ( + "_name", + "_attributes", + "_span_id", + "_trace_id", + "_parent_span_id", + "_segment", + "_sampled", + "_start_timestamp", + "_timestamp", + "_status", + "_start_timestamp_monotonic_ns", + "_scope", + "_flags", + "_context_manager_state", + "_profile", + "_continuous_profile", + ) + + def __init__( + self, + name: str, + trace_id: str, + attributes: Optional[Attributes] = None, + parent_span_id: Optional[str] = None, + segment: Optional[Span] = None, + scope: Optional[Scope] = None, + ) -> None: + self._name: str = name + self._attributes: "Attributes" = attributes + + self._trace_id = trace_id + self._parent_span_id = parent_span_id + self._segment = segment or self + + self._start_timestamp = datetime.now(timezone.utc) + + try: + # profiling depends on this value and requires that + # it is measured in nanoseconds + self._start_timestamp_monotonic_ns = nanosecond_time() + except AttributeError: + pass + + self._timestamp: "Optional[datetime]" = None + self._span_id: "Optional[str]" = None + self._status: SPANSTATUS = SPANSTATUS.OK + self._sampled: "Optional[bool]" = None + self._scope: "Optional[Scope]" = scope # TODO[span-first] when are we starting a span with a specific scope? is this needed? + self._flags: dict[str, bool] = {} + + self._update_active_thread() + self._set_profiler_id(get_profiler_id()) + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}(" + f"name={self._name}, " + f"trace_id={self._trace_id}, " + f"span_id={self._span_id}, " + f"parent_span_id={self._parent_span_id}, " + f"sampled={self._sampled})>" + ) + + def __enter__(self) -> "Span": + scope = self._scope or sentry_sdk.get_current_scope() + old_span = scope.span + scope.span = self + self._context_manager_state = (scope, old_span) + + if self.is_segment() and self._profile is not None: + self._profile.__enter__() + + return self + + def __exit__( + self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" + ) -> None: + if self.is_segment(): + if self._profile is not None: + self._profile.__exit__(ty, value, tb) + + if self._continuous_profile is not None: + self._continuous_profile.stop() + + if value is not None and should_be_treated_as_error(ty, value): + self.set_status(SPANSTATUS.INTERNAL_ERROR) + + with capture_internal_exceptions(): + scope, old_span = self._context_manager_state + del self._context_manager_state + self.end(scope=scope) + scope.span = old_span + + def end( + self, + end_timestamp: "Optional[Union[float, datetime]]" = None, + scope: "Optional[sentry_sdk.Scope]" = None, + ) -> "Optional[str]": + """ + Set the end timestamp of the span. + + :param end_timestamp: Optional timestamp that should + be used as timestamp instead of the current time. + :param scope: The scope to use for this transaction. + If not provided, the current scope will be used. + """ + client = sentry_sdk.get_client() + if not client.is_active(): + return None + + scope: "Optional[sentry_sdk.Scope]" = ( + scope or self._scope or sentry_sdk.get_current_scope() + ) + + # Explicit check against False needed because self.sampled might be None + if self._sampled is False: + logger.debug("Discarding span because sampled = False") + + # This is not entirely accurate because discards here are not + # exclusively based on sample rate but also traces sampler, but + # we handle this the same here. + if client.transport and has_tracing_enabled(client.options): + if client.monitor and client.monitor.downsample_factor > 0: + reason = "backpressure" + else: + reason = "sample_rate" + + client.transport.record_lost_event(reason, data_category="span") + + return None + + if self._sampled is None: + logger.warning("Discarding transaction without sampling decision.") + + if self.timestamp is not None: + # This span is already finished, ignore. + return None + + try: + if end_timestamp: + if isinstance(end_timestamp, float): + end_timestamp = datetime.fromtimestamp(end_timestamp, timezone.utc) + self.timestamp = end_timestamp + else: + elapsed = nanosecond_time() - self._start_timestamp_monotonic_ns + self.timestamp = self._start_timestamp + timedelta( + microseconds=elapsed / 1000 + ) + except AttributeError: + self.timestamp = datetime.now(timezone.utc) + + if self.segment.sampled: + client._capture_span(self) + return + + def get_attributes(self) -> Attributes: + return self._attributes + + def set_attribute(self, key: str, value: AttributeValue) -> None: + self._attributes[key] = format_attribute(value) + + def set_attributes(self, attributes: Attributes) -> None: + for key, value in attributes.items(): + self.set_attribute(key, value) + + def set_status(self, status: SPANSTATUS) -> None: + self._status = status + + def get_name(self) -> str: + return self._name + + def set_name(self, name: str) -> None: + self._name = name + + @property + def segment(self) -> "StreamedSpan": + return self._segment + + def is_segment(self) -> bool: + return self.segment == self + + @property + def sampled(self) -> "Optional[bool]": + return self._sampled + + @property + def span_id(self) -> str: + if not self._span_id: + self._span_id = uuid.uuid4().hex[16:] + + return self._span_id + + @property + def trace_id(self) -> str: + if not self._trace_id: + self._trace_id = uuid.uuid4().hex + + return self._trace_id + + def _update_active_thread(self) -> None: + thread_id, thread_name = get_current_thread_meta() + self._set_thread(thread_id, thread_name) + + def _set_thread( + self, thread_id: "Optional[int]", thread_name: "Optional[str]" + ) -> None: + if thread_id is not None: + self.set_attribute(SPANDATA.THREAD_ID, str(thread_id)) + + if thread_name is not None: + self.set_attribute(SPANDATA.THREAD_NAME, thread_name) + + def _set_profiler_id(self, profiler_id: "Optional[str]") -> None: + if profiler_id is not None: + self.set_attribute(SPANDATA.PROFILER_ID, profiler_id) + + def _set_http_status(self, http_status: int) -> None: + self.set_attribute(SPANDATA.HTTP_STATUS_CODE, http_status) + + if http_status >= 400: + self.set_status(SPANSTATUS.ERROR) + else: + self.set_status(SPANSTATUS.OK) diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index f45b849499..5b6e22be36 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -106,6 +106,13 @@ def has_tracing_enabled(options: "Optional[Dict[str, Any]]") -> bool: ) +def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool: + if options is None: + return False + + return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" + + @contextlib.contextmanager def record_sql_queries( cursor: "Any", From 705790a29a03b9771a944d2643c090c37a9a0551 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 15 Jan 2026 13:47:43 +0100 Subject: [PATCH 2/5] More refactor, fixing some types --- sentry_sdk/_span_batcher.py | 26 +++++++++++--------------- sentry_sdk/_types.py | 2 -- sentry_sdk/consts.py | 4 +--- sentry_sdk/trace.py | 37 ++++++++++++++++++++++--------------- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 35ed09148e..0be0dbe8cb 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from typing import Any, Callable, Optional - from sentry_sdk.trace import Span + from sentry_sdk.trace import SpanStatus, StreamedSpan class SpanBatcher(Batcher["Span"]): @@ -32,7 +32,7 @@ def __init__( # by trace_id, so that we can then send the buckets each in its own # envelope. # trace_id -> span buffer - self._span_buffer: dict[str, list["Span"]] = defaultdict(list) + self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) self._capture_func = capture_func self._record_lost_func = record_lost_func self._running = True @@ -47,7 +47,7 @@ def get_size(self) -> int: # caller is responsible for locking before checking this return sum(len(buffer) for buffer in self._span_buffer.values()) - def add(self, span: Span) -> None: + def add(self, span: "StreamedSpan") -> None: if not self._ensure_thread() or self._flusher is None: return None @@ -66,23 +66,19 @@ def add(self, span: Span) -> None: self._flush_event.set() @staticmethod - def _to_transport_format(item: "Span") -> "Any": - is_segment = item.containing_transaction == item - + def _to_transport_format(item: "StreamedSpan") -> "Any": res = { "trace_id": item.trace_id, "span_id": item.span_id, - "name": item.name if is_segment else item.description, - "status": SPANSTATUS.OK - if item.status == SPANSTATUS.OK - else SPANSTATUS.INTERNAL_ERROR, - "is_segment": is_segment, - "start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first] - "end_timestamp": item.timestamp.timestamp(), + "name": item.get_name(), + "status": item._status, + "is_segment": item.is_segment(), + "start_timestamp": item._start_timestamp.timestamp(), # TODO[span-first] + "end_timestamp": item._timestamp.timestamp(), } - if item.parent_span_id: - res["parent_span_id"] = item.parent_span_id + if item._parent_span_id: + res["parent_span_id"] = item._parent_span_id if item._attributes: res["attributes"] = { diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index 5f17aaab11..204227aa00 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -294,8 +294,6 @@ class SDKInfo(TypedDict): }, ) - TraceLifecycleMode = Literal["static", "stream"] - # TODO: Make a proper type definition for this (PRs welcome!) Breadcrumb = Dict[str, Any] diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index cd1e8243e5..d682e8dc9f 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -2,8 +2,6 @@ from enum import Enum from typing import TYPE_CHECKING -from sentry_sdk._types import TraceLifecycleMode - # up top to prevent circular import due to integration import # This is more or less an arbitrary large-ish value for now, so that we allow # pretty long strings (like LLM prompts), but still have *some* upper limit @@ -84,7 +82,7 @@ class CompressionAlgo(Enum): "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], - "trace_lifecycle": Optional[TraceLifecycleMode], + "trace_lifecycle": Optional[Literal["static", "stream"]], }, total=False, ) diff --git a/sentry_sdk/trace.py b/sentry_sdk/trace.py index 90bfe69205..59e699f557 100644 --- a/sentry_sdk/trace.py +++ b/sentry_sdk/trace.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING import sentry_sdk -from sentry_sdk.consts import SPANDATA, SPANSTATUS +from sentry_sdk.consts import SPANDATA from sentry_sdk.profiler.continuous_profiler import get_profiler_id from sentry_sdk.tracing import Span from sentry_sdk.tracing_utils import has_span_streaming_enabled, has_tracing_enabled @@ -35,6 +35,8 @@ - dropped spans are not migrated - recheck transaction.finish <-> Streamedspan.end - profile not part of the event, how to send? +- maybe: use getters/setter OR properties but not both +- add size-based flushing to buffer(s) Notes: - removed ability to provide a start_timestamp @@ -50,8 +52,12 @@ def start_span( return sentry_sdk.get_current_scope().start_streamed_span() -BAGGAGE_HEADER_NAME = "baggage" -SENTRY_TRACE_HEADER_NAME = "sentry-trace" +class SpanStatus(str, Enum): + OK = "ok" + ERROR = "error" + + def __str__(self) -> str: + return self.value # Segment source, see @@ -73,6 +79,7 @@ def __str__(self) -> str: SegmentSource.URL, ] + SOURCE_FOR_STYLE = { "endpoint": SegmentSource.COMPONENT, "function_name": SegmentSource.COMPONENT, @@ -119,10 +126,10 @@ def __init__( self, name: str, trace_id: str, - attributes: Optional[Attributes] = None, - parent_span_id: Optional[str] = None, - segment: Optional[Span] = None, - scope: Optional[Scope] = None, + attributes: "Optional[Attributes]" = None, + parent_span_id: "Optional[str]" = None, + segment: "Optional[Span]" = None, + scope: "Optional[Scope]" = None, ) -> None: self._name: str = name self._attributes: "Attributes" = attributes @@ -142,7 +149,7 @@ def __init__( self._timestamp: "Optional[datetime]" = None self._span_id: "Optional[str]" = None - self._status: SPANSTATUS = SPANSTATUS.OK + self._status: SpanStatus = SpanStatus.OK self._sampled: "Optional[bool]" = None self._scope: "Optional[Scope]" = scope # TODO[span-first] when are we starting a span with a specific scope? is this needed? self._flags: dict[str, bool] = {} @@ -182,7 +189,7 @@ def __exit__( self._continuous_profile.stop() if value is not None and should_be_treated_as_error(ty, value): - self.set_status(SPANSTATUS.INTERNAL_ERROR) + self.set_status(SpanStatus.ERROR) with capture_internal_exceptions(): scope, old_span = self._context_manager_state @@ -252,17 +259,17 @@ def end( client._capture_span(self) return - def get_attributes(self) -> Attributes: + def get_attributes(self) -> "Attributes": return self._attributes - def set_attribute(self, key: str, value: AttributeValue) -> None: + def set_attribute(self, key: str, value: "AttributeValue") -> None: self._attributes[key] = format_attribute(value) - def set_attributes(self, attributes: Attributes) -> None: + def set_attributes(self, attributes: "Attributes") -> None: for key, value in attributes.items(): self.set_attribute(key, value) - def set_status(self, status: SPANSTATUS) -> None: + def set_status(self, status: SpanStatus) -> None: self._status = status def get_name(self) -> str: @@ -317,6 +324,6 @@ def _set_http_status(self, http_status: int) -> None: self.set_attribute(SPANDATA.HTTP_STATUS_CODE, http_status) if http_status >= 400: - self.set_status(SPANSTATUS.ERROR) + self.set_status(SpanStatus.ERROR) else: - self.set_status(SPANSTATUS.OK) + self.set_status(SpanStatus.OK) From b3f4f988ce55951096e148c4373f4efccc22ec41 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 15 Jan 2026 14:32:42 +0100 Subject: [PATCH 3/5] Cant use sentry_sdk.trace as that already exists --- sentry_sdk/trace.py => sentry_sdk._tracing.py | 12 +- sentry_sdk/_tracing.py | 353 ++++++++++++++++++ sentry_sdk/scope.py | 2 +- sentry_sdk/tracing.py | 4 + sentry_sdk/tracing_utils.py | 48 +++ 5 files changed, 417 insertions(+), 2 deletions(-) rename sentry_sdk/trace.py => sentry_sdk._tracing.py (97%) create mode 100644 sentry_sdk/_tracing.py diff --git a/sentry_sdk/trace.py b/sentry_sdk._tracing.py similarity index 97% rename from sentry_sdk/trace.py rename to sentry_sdk._tracing.py index 59e699f557..8dfd4de1b9 100644 --- a/sentry_sdk/trace.py +++ b/sentry_sdk._tracing.py @@ -7,7 +7,11 @@ from sentry_sdk.consts import SPANDATA from sentry_sdk.profiler.continuous_profiler import get_profiler_id from sentry_sdk.tracing import Span -from sentry_sdk.tracing_utils import has_span_streaming_enabled, has_tracing_enabled +from sentry_sdk.tracing_utils import ( + Baggage, + has_span_streaming_enabled, + has_tracing_enabled, +) from sentry_sdk.utils import ( capture_internal_exceptions, format_attribute, @@ -120,6 +124,7 @@ class StreamedSpan: "_context_manager_state", "_profile", "_continuous_profile", + "_baggage", ) def __init__( @@ -303,6 +308,11 @@ def trace_id(self) -> str: return self._trace_id + @property + def dynamic_sampling_context(self) -> str: + # TODO + return self.segment.get_baggage().dynamic_sampling_context() + def _update_active_thread(self) -> None: thread_id, thread_name = get_current_thread_meta() self._set_thread(thread_id, thread_name) diff --git a/sentry_sdk/_tracing.py b/sentry_sdk/_tracing.py new file mode 100644 index 0000000000..8ba0ea9e12 --- /dev/null +++ b/sentry_sdk/_tracing.py @@ -0,0 +1,353 @@ +import uuid +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import TYPE_CHECKING + +import sentry_sdk +from sentry_sdk.consts import SPANDATA +from sentry_sdk.profiler.continuous_profiler import get_profiler_id +from sentry_sdk.tracing_utils import ( + Baggage, + has_span_streaming_enabled, + has_tracing_enabled, +) +from sentry_sdk.utils import ( + capture_internal_exceptions, + format_attribute, + get_current_thread_meta, + logger, + nanosecond_time, + should_be_treated_as_error, +) + +if TYPE_CHECKING: + from typing import Any, Optional, Union + from sentry_sdk._types import Attributes, AttributeValue + from sentry_sdk.scope import Scope + + +FLAGS_CAPACITY = 10 + +""" +TODO[span-first] / notes +- redis, http, subprocess breadcrumbs (maybe_create_breadcrumbs_from_span) work + on op, change or ignore? +- @trace +- tags +- initial status: OK? or unset? +- dropped spans are not migrated +- recheck transaction.finish <-> Streamedspan.end +- profile not part of the event, how to send? +- maybe: use getters/setter OR properties but not both +- add size-based flushing to buffer(s) +- migrate transaction sample_rand logic + +Notes: +- removed ability to provide a start_timestamp +- moved _flags_capacity to a const +""" + + +def start_span( + name: str, + attributes: "Optional[Attributes]" = None, + parent_span: "Optional[StreamedSpan]" = None, +) -> "StreamedSpan": + return sentry_sdk.get_current_scope().start_streamed_span() + + +class SpanStatus(str, Enum): + OK = "ok" + ERROR = "error" + + def __str__(self) -> str: + return self.value + + +# Segment source, see +# https://getsentry.github.io/sentry-conventions/generated/attributes/sentry.html#sentryspansource +class SegmentSource(str, Enum): + COMPONENT = "component" + CUSTOM = "custom" + ROUTE = "route" + TASK = "task" + URL = "url" + VIEW = "view" + + def __str__(self) -> str: + return self.value + + +# These are typically high cardinality and the server hates them +LOW_QUALITY_SEGMENT_SOURCES = [ + SegmentSource.URL, +] + + +SOURCE_FOR_STYLE = { + "endpoint": SegmentSource.COMPONENT, + "function_name": SegmentSource.COMPONENT, + "handler_name": SegmentSource.COMPONENT, + "method_and_path_pattern": SegmentSource.ROUTE, + "path": SegmentSource.URL, + "route_name": SegmentSource.COMPONENT, + "route_pattern": SegmentSource.ROUTE, + "uri_template": SegmentSource.ROUTE, + "url": SegmentSource.ROUTE, +} + + +class StreamedSpan: + """ + A span holds timing information of a block of code. + + Spans can have multiple child spans thus forming a span tree. + + This is the Span First span implementation. The original transaction-based + span implementation lives in tracing.Span. + """ + + __slots__ = ( + "_name", + "_attributes", + "_span_id", + "_trace_id", + "_parent_span_id", + "_segment", + "_sampled", + "_start_timestamp", + "_timestamp", + "_status", + "_start_timestamp_monotonic_ns", + "_scope", + "_flags", + "_context_manager_state", + "_profile", + "_continuous_profile", + "_baggage", + "_sample_rate", + "_sample_rand", + ) + + def __init__( + self, + name: str, + trace_id: str, + attributes: "Optional[Attributes]" = None, + parent_span_id: "Optional[str]" = None, + segment: "Optional[StreamedSpan]" = None, + scope: "Optional[Scope]" = None, + ) -> None: + self._name: str = name + self._attributes: "Attributes" = attributes + + self._trace_id = trace_id + self._parent_span_id = parent_span_id + self._segment = segment or self + + self._start_timestamp = datetime.now(timezone.utc) + + try: + # profiling depends on this value and requires that + # it is measured in nanoseconds + self._start_timestamp_monotonic_ns = nanosecond_time() + except AttributeError: + pass + + self._timestamp: "Optional[datetime]" = None + self._span_id: "Optional[str]" = None + self._status: SpanStatus = SpanStatus.OK + self._sampled: "Optional[bool]" = None + self._scope: "Optional[Scope]" = scope # TODO[span-first] when are we starting a span with a specific scope? is this needed? + self._flags: dict[str, bool] = {} + + self._update_active_thread() + self._set_profiler_id(get_profiler_id()) + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}(" + f"name={self._name}, " + f"trace_id={self._trace_id}, " + f"span_id={self._span_id}, " + f"parent_span_id={self._parent_span_id}, " + f"sampled={self._sampled})>" + ) + + def __enter__(self) -> "StreamedSpan": + scope = self._scope or sentry_sdk.get_current_scope() + old_span = scope.span + scope.span = self + self._context_manager_state = (scope, old_span) + + if self.is_segment() and self._profile is not None: + self._profile.__enter__() + + return self + + def __exit__( + self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" + ) -> None: + if self.is_segment(): + if self._profile is not None: + self._profile.__exit__(ty, value, tb) + + if self._continuous_profile is not None: + self._continuous_profile.stop() + + if value is not None and should_be_treated_as_error(ty, value): + self.set_status(SpanStatus.ERROR) + + with capture_internal_exceptions(): + scope, old_span = self._context_manager_state + del self._context_manager_state + self.end(scope=scope) + scope.span = old_span + + def end( + self, + end_timestamp: "Optional[Union[float, datetime]]" = None, + scope: "Optional[sentry_sdk.Scope]" = None, + ) -> "Optional[str]": + """ + Set the end timestamp of the span. + + :param end_timestamp: Optional timestamp that should + be used as timestamp instead of the current time. + :param scope: The scope to use for this transaction. + If not provided, the current scope will be used. + """ + client = sentry_sdk.get_client() + if not client.is_active(): + return None + + scope: "Optional[sentry_sdk.Scope]" = ( + scope or self._scope or sentry_sdk.get_current_scope() + ) + + # Explicit check against False needed because self.sampled might be None + if self._sampled is False: + logger.debug("Discarding span because sampled = False") + + # This is not entirely accurate because discards here are not + # exclusively based on sample rate but also traces sampler, but + # we handle this the same here. + if client.transport and has_tracing_enabled(client.options): + if client.monitor and client.monitor.downsample_factor > 0: + reason = "backpressure" + else: + reason = "sample_rate" + + client.transport.record_lost_event(reason, data_category="span") + + return None + + if self._sampled is None: + logger.warning("Discarding transaction without sampling decision.") + + if self.timestamp is not None: + # This span is already finished, ignore. + return None + + try: + if end_timestamp: + if isinstance(end_timestamp, float): + end_timestamp = datetime.fromtimestamp(end_timestamp, timezone.utc) + self.timestamp = end_timestamp + else: + elapsed = nanosecond_time() - self._start_timestamp_monotonic_ns + self.timestamp = self._start_timestamp + timedelta( + microseconds=elapsed / 1000 + ) + except AttributeError: + self.timestamp = datetime.now(timezone.utc) + + if self.segment.sampled: + client._capture_span(self) + return + + def get_attributes(self) -> "Attributes": + return self._attributes + + def set_attribute(self, key: str, value: "AttributeValue") -> None: + self._attributes[key] = format_attribute(value) + + def set_attributes(self, attributes: "Attributes") -> None: + for key, value in attributes.items(): + self.set_attribute(key, value) + + def set_status(self, status: SpanStatus) -> None: + self._status = status + + def get_name(self) -> str: + return self._name + + def set_name(self, name: str) -> None: + self._name = name + + @property + def segment(self) -> "StreamedSpan": + return self._segment + + def is_segment(self) -> bool: + return self.segment == self + + @property + def sampled(self) -> "Optional[bool]": + return self._sampled + + @property + def span_id(self) -> str: + if not self._span_id: + self._span_id = uuid.uuid4().hex[16:] + + return self._span_id + + @property + def trace_id(self) -> str: + if not self._trace_id: + self._trace_id = uuid.uuid4().hex + + return self._trace_id + + @property + def dynamic_sampling_context(self) -> str: + # TODO + return self.segment.get_baggage().dynamic_sampling_context() + + def _update_active_thread(self) -> None: + thread_id, thread_name = get_current_thread_meta() + self._set_thread(thread_id, thread_name) + + def _set_thread( + self, thread_id: "Optional[int]", thread_name: "Optional[str]" + ) -> None: + if thread_id is not None: + self.set_attribute(SPANDATA.THREAD_ID, str(thread_id)) + + if thread_name is not None: + self.set_attribute(SPANDATA.THREAD_NAME, thread_name) + + def _set_profiler_id(self, profiler_id: "Optional[str]") -> None: + if profiler_id is not None: + self.set_attribute(SPANDATA.PROFILER_ID, profiler_id) + + def _set_http_status(self, http_status: int) -> None: + self.set_attribute(SPANDATA.HTTP_STATUS_CODE, http_status) + + if http_status >= 400: + self.set_status(SpanStatus.ERROR) + else: + self.set_status(SpanStatus.OK) + + def _get_baggage(self) -> "Baggage": + """ + Return the :py:class:`~sentry_sdk.tracing_utils.Baggage` associated with + the segment. + + The first time a new baggage with Sentry items is made, it will be frozen. + """ + if not self._baggage or self._baggage.mutable: + self._baggage = Baggage.populate_from_segment(self) + + return self._baggage diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index eee951b7f9..97140c6227 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -32,7 +32,7 @@ normalize_incoming_data, PropagationContext, ) -from sentry_sdk.trace import StreamedSpan +from sentry_sdk._tracing import StreamedSpan from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index c4b38e4528..9160ae7b20 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -126,6 +126,10 @@ class TransactionKwargs(SpanKwargs, total=False): }, ) +# TODO: Once the old Span class is gone, move _tracing.py to tracing.py. This is +# here for now so that you can do sentry_sdk.tracing.start_span for the new API. +from sentry_sdk._tracing import start_span + BAGGAGE_HEADER_NAME = "baggage" SENTRY_TRACE_HEADER_NAME = "sentry-trace" diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 5b6e22be36..a1e8fbc4fe 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -749,6 +749,54 @@ def populate_from_transaction( return Baggage(sentry_items, mutable=False) + @classmethod + def populate_from_segment( + cls, segment: "sentry_sdk.trace.StreamedSpan" + ) -> "Baggage": + """ + Populate fresh baggage entry with sentry_items and make it immutable + if this is the head SDK which originates traces. + """ + client = sentry_sdk.get_client() + sentry_items: "Dict[str, str]" = {} + + if not client.is_active(): + return Baggage(sentry_items) + + options = client.options or {} + + sentry_items["trace_id"] = segment.trace_id + sentry_items["sample_rand"] = f"{segment._sample_rand:.6f}" # noqa: E231 + + if options.get("environment"): + sentry_items["environment"] = options["environment"] + + if options.get("release"): + sentry_items["release"] = options["release"] + + if client.parsed_dsn: + sentry_items["public_key"] = client.parsed_dsn.public_key + if client.parsed_dsn.org_id: + sentry_items["org_id"] = client.parsed_dsn.org_id + + if segment.source not in LOW_QUALITY_TRANSACTION_SOURCES: + sentry_items["transaction"] = segment.name + + if segment._sample_rate is not None: + sentry_items["sample_rate"] = str(segment._sample_rate) + + if segment._sampled is not None: + sentry_items["sampled"] = "true" if segment._sampled else "false" + + # There's an existing baggage but it was mutable, which is why we are + # creating this new baggage. + # However, if by chance the user put some sentry items in there, give + # them precedence. + if segment._baggage and segment._baggage.sentry_items: + sentry_items.update(segment._baggage.sentry_items) + + return Baggage(sentry_items, mutable=False) + def freeze(self) -> None: self.mutable = False From 97cf2916a420fec586c1554791db9f3ca0c49d7b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 15 Jan 2026 14:38:53 +0100 Subject: [PATCH 4/5] bubu --- sentry_sdk._tracing.py | 339 ----------------------------------------- 1 file changed, 339 deletions(-) delete mode 100644 sentry_sdk._tracing.py diff --git a/sentry_sdk._tracing.py b/sentry_sdk._tracing.py deleted file mode 100644 index 8dfd4de1b9..0000000000 --- a/sentry_sdk._tracing.py +++ /dev/null @@ -1,339 +0,0 @@ -import uuid -from datetime import datetime, timedelta, timezone -from enum import Enum -from typing import TYPE_CHECKING - -import sentry_sdk -from sentry_sdk.consts import SPANDATA -from sentry_sdk.profiler.continuous_profiler import get_profiler_id -from sentry_sdk.tracing import Span -from sentry_sdk.tracing_utils import ( - Baggage, - has_span_streaming_enabled, - has_tracing_enabled, -) -from sentry_sdk.utils import ( - capture_internal_exceptions, - format_attribute, - get_current_thread_meta, - logger, - nanosecond_time, - should_be_treated_as_error, -) - -if TYPE_CHECKING: - from typing import Any, Optional, Union - from sentry_sdk._types import Attributes, AttributeValue - from sentry_sdk.scope import Scope - - -FLAGS_CAPACITY = 10 - -""" -TODO[span-first] / notes -- redis, http, subprocess breadcrumbs (maybe_create_breadcrumbs_from_span) work - on op, change or ignore? -- @trace -- tags -- initial status: OK? or unset? -- dropped spans are not migrated -- recheck transaction.finish <-> Streamedspan.end -- profile not part of the event, how to send? -- maybe: use getters/setter OR properties but not both -- add size-based flushing to buffer(s) - -Notes: -- removed ability to provide a start_timestamp -- moved _flags_capacity to a const -""" - - -def start_span( - name: str, - attributes: "Optional[Attributes]" = None, - parent_span: "Optional[Span]" = None, -) -> Span: - return sentry_sdk.get_current_scope().start_streamed_span() - - -class SpanStatus(str, Enum): - OK = "ok" - ERROR = "error" - - def __str__(self) -> str: - return self.value - - -# Segment source, see -# https://getsentry.github.io/sentry-conventions/generated/attributes/sentry.html#sentryspansource -class SegmentSource(str, Enum): - COMPONENT = "component" - CUSTOM = "custom" - ROUTE = "route" - TASK = "task" - URL = "url" - VIEW = "view" - - def __str__(self) -> str: - return self.value - - -# These are typically high cardinality and the server hates them -LOW_QUALITY_SEGMENT_SOURCES = [ - SegmentSource.URL, -] - - -SOURCE_FOR_STYLE = { - "endpoint": SegmentSource.COMPONENT, - "function_name": SegmentSource.COMPONENT, - "handler_name": SegmentSource.COMPONENT, - "method_and_path_pattern": SegmentSource.ROUTE, - "path": SegmentSource.URL, - "route_name": SegmentSource.COMPONENT, - "route_pattern": SegmentSource.ROUTE, - "uri_template": SegmentSource.ROUTE, - "url": SegmentSource.ROUTE, -} - - -class StreamedSpan: - """ - A span holds timing information of a block of code. - - Spans can have multiple child spans thus forming a span tree. - - This is the Span First span implementation. The original transaction-based - span implementation lives in tracing.Span. - """ - - __slots__ = ( - "_name", - "_attributes", - "_span_id", - "_trace_id", - "_parent_span_id", - "_segment", - "_sampled", - "_start_timestamp", - "_timestamp", - "_status", - "_start_timestamp_monotonic_ns", - "_scope", - "_flags", - "_context_manager_state", - "_profile", - "_continuous_profile", - "_baggage", - ) - - def __init__( - self, - name: str, - trace_id: str, - attributes: "Optional[Attributes]" = None, - parent_span_id: "Optional[str]" = None, - segment: "Optional[Span]" = None, - scope: "Optional[Scope]" = None, - ) -> None: - self._name: str = name - self._attributes: "Attributes" = attributes - - self._trace_id = trace_id - self._parent_span_id = parent_span_id - self._segment = segment or self - - self._start_timestamp = datetime.now(timezone.utc) - - try: - # profiling depends on this value and requires that - # it is measured in nanoseconds - self._start_timestamp_monotonic_ns = nanosecond_time() - except AttributeError: - pass - - self._timestamp: "Optional[datetime]" = None - self._span_id: "Optional[str]" = None - self._status: SpanStatus = SpanStatus.OK - self._sampled: "Optional[bool]" = None - self._scope: "Optional[Scope]" = scope # TODO[span-first] when are we starting a span with a specific scope? is this needed? - self._flags: dict[str, bool] = {} - - self._update_active_thread() - self._set_profiler_id(get_profiler_id()) - - def __repr__(self) -> str: - return ( - f"<{self.__class__.__name__}(" - f"name={self._name}, " - f"trace_id={self._trace_id}, " - f"span_id={self._span_id}, " - f"parent_span_id={self._parent_span_id}, " - f"sampled={self._sampled})>" - ) - - def __enter__(self) -> "Span": - scope = self._scope or sentry_sdk.get_current_scope() - old_span = scope.span - scope.span = self - self._context_manager_state = (scope, old_span) - - if self.is_segment() and self._profile is not None: - self._profile.__enter__() - - return self - - def __exit__( - self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" - ) -> None: - if self.is_segment(): - if self._profile is not None: - self._profile.__exit__(ty, value, tb) - - if self._continuous_profile is not None: - self._continuous_profile.stop() - - if value is not None and should_be_treated_as_error(ty, value): - self.set_status(SpanStatus.ERROR) - - with capture_internal_exceptions(): - scope, old_span = self._context_manager_state - del self._context_manager_state - self.end(scope=scope) - scope.span = old_span - - def end( - self, - end_timestamp: "Optional[Union[float, datetime]]" = None, - scope: "Optional[sentry_sdk.Scope]" = None, - ) -> "Optional[str]": - """ - Set the end timestamp of the span. - - :param end_timestamp: Optional timestamp that should - be used as timestamp instead of the current time. - :param scope: The scope to use for this transaction. - If not provided, the current scope will be used. - """ - client = sentry_sdk.get_client() - if not client.is_active(): - return None - - scope: "Optional[sentry_sdk.Scope]" = ( - scope or self._scope or sentry_sdk.get_current_scope() - ) - - # Explicit check against False needed because self.sampled might be None - if self._sampled is False: - logger.debug("Discarding span because sampled = False") - - # This is not entirely accurate because discards here are not - # exclusively based on sample rate but also traces sampler, but - # we handle this the same here. - if client.transport and has_tracing_enabled(client.options): - if client.monitor and client.monitor.downsample_factor > 0: - reason = "backpressure" - else: - reason = "sample_rate" - - client.transport.record_lost_event(reason, data_category="span") - - return None - - if self._sampled is None: - logger.warning("Discarding transaction without sampling decision.") - - if self.timestamp is not None: - # This span is already finished, ignore. - return None - - try: - if end_timestamp: - if isinstance(end_timestamp, float): - end_timestamp = datetime.fromtimestamp(end_timestamp, timezone.utc) - self.timestamp = end_timestamp - else: - elapsed = nanosecond_time() - self._start_timestamp_monotonic_ns - self.timestamp = self._start_timestamp + timedelta( - microseconds=elapsed / 1000 - ) - except AttributeError: - self.timestamp = datetime.now(timezone.utc) - - if self.segment.sampled: - client._capture_span(self) - return - - def get_attributes(self) -> "Attributes": - return self._attributes - - def set_attribute(self, key: str, value: "AttributeValue") -> None: - self._attributes[key] = format_attribute(value) - - def set_attributes(self, attributes: "Attributes") -> None: - for key, value in attributes.items(): - self.set_attribute(key, value) - - def set_status(self, status: SpanStatus) -> None: - self._status = status - - def get_name(self) -> str: - return self._name - - def set_name(self, name: str) -> None: - self._name = name - - @property - def segment(self) -> "StreamedSpan": - return self._segment - - def is_segment(self) -> bool: - return self.segment == self - - @property - def sampled(self) -> "Optional[bool]": - return self._sampled - - @property - def span_id(self) -> str: - if not self._span_id: - self._span_id = uuid.uuid4().hex[16:] - - return self._span_id - - @property - def trace_id(self) -> str: - if not self._trace_id: - self._trace_id = uuid.uuid4().hex - - return self._trace_id - - @property - def dynamic_sampling_context(self) -> str: - # TODO - return self.segment.get_baggage().dynamic_sampling_context() - - def _update_active_thread(self) -> None: - thread_id, thread_name = get_current_thread_meta() - self._set_thread(thread_id, thread_name) - - def _set_thread( - self, thread_id: "Optional[int]", thread_name: "Optional[str]" - ) -> None: - if thread_id is not None: - self.set_attribute(SPANDATA.THREAD_ID, str(thread_id)) - - if thread_name is not None: - self.set_attribute(SPANDATA.THREAD_NAME, thread_name) - - def _set_profiler_id(self, profiler_id: "Optional[str]") -> None: - if profiler_id is not None: - self.set_attribute(SPANDATA.PROFILER_ID, profiler_id) - - def _set_http_status(self, http_status: int) -> None: - self.set_attribute(SPANDATA.HTTP_STATUS_CODE, http_status) - - if http_status >= 400: - self.set_status(SpanStatus.ERROR) - else: - self.set_status(SpanStatus.OK) From 0d2097bf8d5f9416222fc2d349f7081874aa9fad Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 15 Jan 2026 14:53:05 +0100 Subject: [PATCH 5/5] dsc, sampling --- sentry_sdk/_span_batcher.py | 8 ++-- sentry_sdk/_tracing.py | 84 +++++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 0be0dbe8cb..ecf3518116 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from typing import Any, Callable, Optional - from sentry_sdk.trace import SpanStatus, StreamedSpan + from sentry_sdk._tracing import SpanStatus, StreamedSpan class SpanBatcher(Batcher["Span"]): @@ -87,8 +87,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": return res - def _flush(self): - # type: (...) -> Optional[Envelope] + def _flush(self) -> Optional[Envelope]: from sentry_sdk.utils import format_timestamp with self._lock: @@ -97,8 +96,7 @@ def _flush(self): for trace_id, spans in self._span_buffer.items(): if spans: - trace_context = spans[0].get_trace_context() - dsc = trace_context.get("dynamic_sampling_context") + dsc = spans[0].dynamic_sampling_context() # XXX[span-first]: empty dsc? envelope = Envelope( diff --git a/sentry_sdk/_tracing.py b/sentry_sdk/_tracing.py index 8ba0ea9e12..cfcc652f8f 100644 --- a/sentry_sdk/_tracing.py +++ b/sentry_sdk/_tracing.py @@ -15,6 +15,7 @@ capture_internal_exceptions, format_attribute, get_current_thread_meta, + is_valid_sample_rate, logger, nanosecond_time, should_be_treated_as_error, @@ -22,7 +23,7 @@ if TYPE_CHECKING: from typing import Any, Optional, Union - from sentry_sdk._types import Attributes, AttributeValue + from sentry_sdk._types import Attributes, AttributeValue, SamplingContext from sentry_sdk.scope import Scope @@ -312,8 +313,7 @@ def trace_id(self) -> str: @property def dynamic_sampling_context(self) -> str: - # TODO - return self.segment.get_baggage().dynamic_sampling_context() + return self.segment._get_baggage().dynamic_sampling_context() def _update_active_thread(self) -> None: thread_id, thread_name = get_current_thread_meta() @@ -351,3 +351,81 @@ def _get_baggage(self) -> "Baggage": self._baggage = Baggage.populate_from_segment(self) return self._baggage + + def _set_initial_sampling_decision( + self, sampling_context: "SamplingContext" + ) -> None: + """ + Sets the segment's sampling decision, according to the following + precedence rules: + + 1. If `traces_sampler` is defined, its decision will be used. It can + choose to keep or ignore any parent sampling decision, or use the + sampling context data to make its own decision or to choose a sample + rate for the transaction. + + 2. If `traces_sampler` is not defined, but there's a parent sampling + decision, the parent sampling decision will be used. + + 3. If `traces_sampler` is not defined and there's no parent sampling + decision, `traces_sample_rate` will be used. + """ + client = sentry_sdk.get_client() + + # nothing to do if tracing is disabled + if not has_tracing_enabled(client.options): + self.sampled = False + return + + if not self.is_segment(): + return + + traces_sampler_defined = callable(client.options.get("traces_sampler")) + + # We would have bailed already if neither `traces_sampler` nor + # `traces_sample_rate` were defined, so one of these should work; prefer + # the hook if so + if traces_sampler_defined: + sample_rate = client.options["traces_sampler"](sampling_context) + else: + if sampling_context["parent_sampled"] is not None: + sample_rate = sampling_context["parent_sampled"] + else: + sample_rate = client.options["traces_sample_rate"] + + # Since this is coming from the user (or from a function provided by the + # user), who knows what we might get. (The only valid values are + # booleans or numbers between 0 and 1.) + if not is_valid_sample_rate(sample_rate, source="Tracing"): + logger.warning( + f"[Tracing] Discarding {self._name} because of invalid sample rate." + ) + self.sampled = False + return + + self.sample_rate = float(sample_rate) + + if client.monitor: + self.sample_rate /= 2**client.monitor.downsample_factor + + # if the function returned 0 (or false), or if `traces_sample_rate` is + # 0, it's a sign the transaction should be dropped + if not self.sample_rate: + if traces_sampler_defined: + reason = "traces_sampler returned 0 or False" + else: + reason = "traces_sample_rate is set to 0" + + logger.debug(f"[Tracing] Discarding {self._name} because {reason}") + self.sampled = False + return + + # Now we roll the dice. + self.sampled = self._sample_rand < self.sample_rate + + if self.sampled: + logger.debug(f"[Tracing] Starting {self.name}") + else: + logger.debug( + f"[Tracing] Discarding {self.name} because it's not included in the random sample (sampling rate = {self.sample_rate})" + )