Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk.consts import SPANSTATUS
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr

if TYPE_CHECKING:
from typing import Any, Callable, Optional
from sentry_sdk.traces import StreamedSpan
from sentry_sdk._types import SerializedAttributeValue


class SpanBatcher(Batcher["StreamedSpan"]):
# TODO[span-first]: size-based flushes
# TODO[span-first]: adjust flush/drop defaults
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 5000
FLUSH_WAIT_TIME = 5.0

TYPE = "span"
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"

def __init__(
self,
capture_func: "Callable[[Envelope], None]",
record_lost_func: "Callable[..., None]",
) -> None:
# Spans from different traces cannot be emitted in the same envelope
# since the envelope contains a shared trace header. That's why we bucket
# by trace_id, so that we can then send the buckets each in its own
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def get_size(self) -> int:
# caller is responsible for locking before checking this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can we not, or should we not, lock for the caller here?

return sum(len(buffer) for buffer in self._span_buffer.values())

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
size = self.get_size()
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()

@staticmethod
def _to_transport_format(item: "StreamedSpan") -> "Any":
# TODO[span-first]
res: "dict[str, Any]" = {}
return res

def _flush(self) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return None

envelopes = []
for trace_id, spans in self._span_buffer.items():
if spans:
# TODO[span-first]
# dsc = spans[0].dynamic_sampling_context()
dsc = None

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Envelope header unconditionally includes null trace context

Low Severity

The _flush method unconditionally includes "trace": dsc in envelope headers even when dsc is None. This results in "trace": null in the serialized JSON. The established pattern elsewhere in the codebase (e.g., in client.py around line 909) is to only include the trace header when the dynamic sampling context is truthy. Including a null trace header is inconsistent and could potentially cause issues with the Sentry backend when this experimental feature is enabled.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All coming in future PRs


envelope.add_item(
Item(
type="span",
content_type="application/vnd.sentry.items.span.v2+json",
headers={
"item_count": len(spans),
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(span)
for span in spans
]
}
),
)
)

envelopes.append(envelope)

self._span_buffer.clear()

for envelope in envelopes:
self._capture_func(envelope)
19 changes: 18 additions & 1 deletion sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sentry_sdk
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
from sentry_sdk._metrics_batcher import MetricsBatcher
from sentry_sdk._span_batcher import SpanBatcher
from sentry_sdk.utils import (
AnnotatedValue,
ContextVar,
Expand All @@ -31,6 +32,7 @@
)
from sentry_sdk.serializer import serialize
from sentry_sdk.tracing import trace
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.transport import BaseHttpTransport, make_transport
from sentry_sdk.consts import (
SPANDATA,
Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
self.monitor: "Optional[Monitor]" = None
self.log_batcher: "Optional[LogBatcher]" = None
self.metrics_batcher: "Optional[MetricsBatcher]" = None
self.span_batcher: "Optional[SpanBatcher]" = None
self.integrations: "dict[str, Integration]" = {}

def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
Expand Down Expand Up @@ -399,6 +402,13 @@ def _record_lost_event(
record_lost_func=_record_lost_event,
)

self.span_batcher = None
if has_span_streaming_enabled(self.options):
self.span_batcher = SpanBatcher(
capture_func=_capture_envelope,
record_lost_func=_record_lost_event,
)

max_request_body_size = ("always", "never", "small", "medium")
if self.options["max_request_body_size"] not in max_request_body_size:
raise ValueError(
Expand Down Expand Up @@ -909,7 +919,10 @@ def capture_event(
return return_value

def _capture_telemetry(
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
self,
telemetry: "Optional[Union[Log, Metric]]",
ty: str,
scope: "Scope",
) -> None:
# Capture attributes-based telemetry (logs, metrics, spansV2)
if telemetry is None:
Expand Down Expand Up @@ -993,6 +1006,8 @@ def close(
self.log_batcher.kill()
if self.metrics_batcher is not None:
self.metrics_batcher.kill()
if self.span_batcher is not None:
self.span_batcher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
Expand All @@ -1018,6 +1033,8 @@ def flush(
self.log_batcher.flush()
if self.metrics_batcher is not None:
self.metrics_batcher.flush()
if self.span_batcher is not None:
self.span_batcher.flush()
self.transport.flush(timeout=timeout, callback=callback)

def __enter__(self) -> "_Client":
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class CompressionAlgo(Enum):
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
"enable_metrics": Optional[bool],
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
"trace_lifecycle": Optional[Literal["static", "stream"]],
},
total=False,
)
Expand Down
39 changes: 39 additions & 0 deletions sentry_sdk/traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""
The API in this file is only meant to be used in span streaming mode.

You can enable span streaming mode via
sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}).
"""

import uuid
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Optional


class StreamedSpan:
"""
A span holds timing information of a block of code.

Spans can have multiple child spans thus forming a span tree.

This is the Span First span implementation. The original transaction-based
span implementation lives in tracing.Span.
"""

__slots__ = ("_trace_id",)

def __init__(
self,
*,
trace_id: "Optional[str]" = None,
):
self._trace_id = trace_id

@property
def trace_id(self) -> str:
if not self._trace_id:
self._trace_id = uuid.uuid4().hex

return self._trace_id
7 changes: 7 additions & 0 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ def has_tracing_enabled(options: "Optional[Dict[str, Any]]") -> bool:
)


def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool:
if options is None:
return False

return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream"


@contextlib.contextmanager
def record_sql_queries(
cursor: "Any",
Expand Down
Loading