Skip to content

Commit 757ee12

Browse files
committed
feat: Span streaming & new span API
1 parent ae502a4 commit 757ee12

File tree

8 files changed

+553
-3
lines changed

8 files changed

+553
-3
lines changed

sentry_sdk/_span_batcher.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import threading
2+
from collections import defaultdict
3+
from datetime import datetime, timezone
4+
from typing import TYPE_CHECKING
5+
6+
from sentry_sdk._batcher import Batcher
7+
from sentry_sdk.consts import SPANSTATUS
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
from sentry_sdk.utils import serialize_attribute, safe_repr
10+
11+
if TYPE_CHECKING:
12+
from typing import Any, Callable, Optional
13+
from sentry_sdk.trace import Span
14+
15+
16+
class SpanBatcher(Batcher["Span"]):
17+
# TODO[span-first]: size-based flushes
18+
MAX_BEFORE_FLUSH = 1000
19+
MAX_BEFORE_DROP = 5000
20+
FLUSH_WAIT_TIME = 5.0
21+
22+
TYPE = "span"
23+
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
24+
25+
def __init__(
26+
self,
27+
capture_func: "Callable[[Envelope], None]",
28+
record_lost_func: "Callable[..., None]",
29+
) -> None:
30+
# Spans from different traces cannot be emitted in the same envelope
31+
# since the envelope contains a shared trace header. That's why we bucket
32+
# by trace_id, so that we can then send the buckets each in its own
33+
# envelope.
34+
# trace_id -> span buffer
35+
self._span_buffer: dict[str, list["Span"]] = defaultdict(list)
36+
self._capture_func = capture_func
37+
self._record_lost_func = record_lost_func
38+
self._running = True
39+
self._lock = threading.Lock()
40+
41+
self._flush_event: "threading.Event" = threading.Event()
42+
43+
self._flusher: "Optional[threading.Thread]" = None
44+
self._flusher_pid: "Optional[int]" = None
45+
46+
def get_size(self) -> int:
47+
# caller is responsible for locking before checking this
48+
return sum(len(buffer) for buffer in self._span_buffer.values())
49+
50+
def add(self, span: Span) -> None:
51+
if not self._ensure_thread() or self._flusher is None:
52+
return None
53+
54+
with self._lock:
55+
size = self.get_size()
56+
if size >= self.MAX_BEFORE_DROP:
57+
self._record_lost_func(
58+
reason="queue_overflow",
59+
data_category="span",
60+
quantity=1,
61+
)
62+
return None
63+
64+
self._span_buffer[span.trace_id].append(span)
65+
if size + 1 >= self.MAX_BEFORE_FLUSH:
66+
self._flush_event.set()
67+
68+
@staticmethod
69+
def _to_transport_format(item: "Span") -> "Any":
70+
is_segment = item.containing_transaction == item
71+
72+
res = {
73+
"trace_id": item.trace_id,
74+
"span_id": item.span_id,
75+
"name": item.name if is_segment else item.description,
76+
"status": SPANSTATUS.OK
77+
if item.status == SPANSTATUS.OK
78+
else SPANSTATUS.INTERNAL_ERROR,
79+
"is_segment": is_segment,
80+
"start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first]
81+
"end_timestamp": item.timestamp.timestamp(),
82+
}
83+
84+
if item.parent_span_id:
85+
res["parent_span_id"] = item.parent_span_id
86+
87+
if item._attributes:
88+
res["attributes"] = {
89+
k: serialize_attribute(v) for (k, v) in item._attributes.items()
90+
}
91+
92+
return res
93+
94+
def _flush(self):
95+
# type: (...) -> Optional[Envelope]
96+
from sentry_sdk.utils import format_timestamp
97+
98+
with self._lock:
99+
if len(self._span_buffer) == 0:
100+
return None
101+
102+
for trace_id, spans in self._span_buffer.items():
103+
if spans:
104+
trace_context = spans[0].get_trace_context()
105+
dsc = trace_context.get("dynamic_sampling_context")
106+
# XXX[span-first]: empty dsc?
107+
108+
envelope = Envelope(
109+
headers={
110+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
111+
"trace": dsc,
112+
}
113+
)
114+
115+
envelope.add_item(
116+
Item(
117+
type="span",
118+
content_type="application/vnd.sentry.items.span.v2+json",
119+
headers={
120+
"item_count": len(spans),
121+
},
122+
payload=PayloadRef(
123+
json={
124+
"items": [
125+
self._to_transport_format(span)
126+
for span in spans
127+
]
128+
}
129+
),
130+
)
131+
)
132+
133+
self._span_buffer.clear()
134+
135+
self._capture_func(envelope)
136+
return envelope

sentry_sdk/_types.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import TYPE_CHECKING, TypeVar, Union
22

3+
from sentry_sdk.consts import SPANSTATUS
4+
35

46
# Re-exported for compat, since code out there in the wild might use this variable.
57
MYPY = TYPE_CHECKING
@@ -274,6 +276,26 @@ class SDKInfo(TypedDict):
274276

275277
MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]
276278

279+
SpanV2Status = Literal[SPANSTATUS.OK, SPANSTATUS.ERROR]
280+
# This is the V2 span format
281+
# https://develop.sentry.dev/sdk/telemetry/spans/span-protocol/
282+
SpanV2 = TypedDict(
283+
"SpanV2",
284+
{
285+
"trace_id": str,
286+
"span_id": str,
287+
"parent_span_id": Optional[str],
288+
"name": str,
289+
"status": SpanV2Status,
290+
"is_segment": bool,
291+
"start_timestamp": float,
292+
"end_timestamp": float,
293+
"attributes": Attributes,
294+
},
295+
)
296+
297+
TraceLifecycleMode = Literal["static", "stream"]
298+
277299
# TODO: Make a proper type definition for this (PRs welcome!)
278300
Breadcrumb = Dict[str, Any]
279301

sentry_sdk/client.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sentry_sdk
1212
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
1313
from sentry_sdk._metrics_batcher import MetricsBatcher
14+
from sentry_sdk._span_batcher import SpanBatcher
1415
from sentry_sdk.utils import (
1516
AnnotatedValue,
1617
ContextVar,
@@ -31,6 +32,7 @@
3132
)
3233
from sentry_sdk.serializer import serialize
3334
from sentry_sdk.tracing import trace
35+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
3436
from sentry_sdk.transport import BaseHttpTransport, make_transport
3537
from sentry_sdk.consts import (
3638
SPANDATA,
@@ -67,6 +69,7 @@
6769
from sentry_sdk.scope import Scope
6870
from sentry_sdk.session import Session
6971
from sentry_sdk.spotlight import SpotlightClient
72+
from sentry_sdk.trace import StreamedSpan
7073
from sentry_sdk.transport import Transport, Item
7174
from sentry_sdk._log_batcher import LogBatcher
7275
from sentry_sdk._metrics_batcher import MetricsBatcher
@@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
188191
self.monitor: "Optional[Monitor]" = None
189192
self.log_batcher: "Optional[LogBatcher]" = None
190193
self.metrics_batcher: "Optional[MetricsBatcher]" = None
194+
self.span_batcher: "Optional[SpanBatcher]" = None
191195
self.integrations: "dict[str, Integration]" = {}
192196

193197
def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
@@ -399,6 +403,12 @@ def _record_lost_event(
399403
record_lost_func=_record_lost_event,
400404
)
401405

406+
self.span_batcher = None
407+
if has_span_streaming_enabled(self.options):
408+
self.span_batcher = SpanBatcher(
409+
capture_func=_capture_envelope, record_lost_func=_record_lost_event
410+
)
411+
402412
max_request_body_size = ("always", "never", "small", "medium")
403413
if self.options["max_request_body_size"] not in max_request_body_size:
404414
raise ValueError(
@@ -909,7 +919,10 @@ def capture_event(
909919
return return_value
910920

911921
def _capture_telemetry(
912-
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
922+
self,
923+
telemetry: "Optional[Union[Log, Metric, StreamedSpan]]",
924+
ty: str,
925+
scope: "Scope",
913926
) -> None:
914927
# Capture attributes-based telemetry (logs, metrics, spansV2)
915928
if telemetry is None:
@@ -921,7 +934,7 @@ def _capture_telemetry(
921934
if ty == "log":
922935
before_send = get_before_send_log(self.options)
923936
elif ty == "metric":
924-
before_send = get_before_send_metric(self.options) # type: ignore
937+
before_send = get_before_send_metric(self.options)
925938

926939
if before_send is not None:
927940
telemetry = before_send(telemetry, {}) # type: ignore
@@ -933,7 +946,9 @@ def _capture_telemetry(
933946
if ty == "log":
934947
batcher = self.log_batcher
935948
elif ty == "metric":
936-
batcher = self.metrics_batcher # type: ignore
949+
batcher = self.metrics_batcher
950+
elif ty == "span":
951+
batcher = self.span_batcher
937952

938953
if batcher is not None:
939954
batcher.add(telemetry) # type: ignore
@@ -944,6 +959,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
944959
def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None:
945960
self._capture_telemetry(metric, "metric", scope)
946961

962+
def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None:
963+
self._capture_telemetry(span, "span", scope)
964+
947965
def capture_session(
948966
self,
949967
session: "Session",

sentry_sdk/consts.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from enum import Enum
33
from typing import TYPE_CHECKING
44

5+
from sentry_sdk._types import TraceLifecycleMode
6+
57
# up top to prevent circular import due to integration import
68
# This is more or less an arbitrary large-ish value for now, so that we allow
79
# pretty long strings (like LLM prompts), but still have *some* upper limit
@@ -82,6 +84,7 @@ class CompressionAlgo(Enum):
8284
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
8385
"enable_metrics": Optional[bool],
8486
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
87+
"trace_lifecycle": Optional[TraceLifecycleMode],
8588
},
8689
total=False,
8790
)
@@ -877,6 +880,8 @@ class SPANSTATUS:
877880
UNIMPLEMENTED = "unimplemented"
878881
UNKNOWN_ERROR = "unknown_error"
879882

883+
ERROR = "error" # span-first specific
884+
880885

881886
class OP:
882887
ANTHROPIC_MESSAGES_CREATE = "ai.messages.create.anthropic"

sentry_sdk/envelope.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ def data_category(self) -> "EventDataCategory":
253253
return "session"
254254
elif ty == "attachment":
255255
return "attachment"
256+
elif ty == "span":
257+
return "span"
256258
elif ty == "transaction":
257259
return "transaction"
258260
elif ty == "event":

sentry_sdk/scope.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
normalize_incoming_data,
3333
PropagationContext,
3434
)
35+
from sentry_sdk.trace import StreamedSpan
3536
from sentry_sdk.tracing import (
3637
BAGGAGE_HEADER_NAME,
3738
SENTRY_TRACE_HEADER_NAME,
@@ -1147,6 +1148,40 @@ def start_span(
11471148

11481149
return span
11491150

1151+
def start_streamed_span(
1152+
self,
1153+
name: str,
1154+
attributes: "Optional[Attributes]" = None,
1155+
parent_span: "Optional[StreamedSpan]" = None,
1156+
) -> "StreamedSpan":
1157+
# TODO: rename to start_span once we drop the old API
1158+
with new_scope():
1159+
if parent_span is None:
1160+
# get current span or transaction
1161+
parent_span = self.span or self.get_isolation_scope().span
1162+
1163+
if parent_span is None:
1164+
# New spans get the `trace_id` from the scope
1165+
propagation_context = self.get_active_propagation_context()
1166+
span = StreamedSpan(
1167+
name=name,
1168+
attributes=attributes,
1169+
trace_id=propagation_context.trace_id,
1170+
scope=self,
1171+
)
1172+
else:
1173+
# Children take propagation context from the parent span
1174+
span = StreamedSpan(
1175+
name=name,
1176+
attributes=attributes,
1177+
trace_id=parent_span.trace_id,
1178+
parent_span_id=parent_span.span_id,
1179+
segment=parent_span.segment,
1180+
scope=self,
1181+
)
1182+
1183+
return span
1184+
11501185
def continue_trace(
11511186
self,
11521187
environ_or_headers: "Dict[str, Any]",
@@ -1180,6 +1215,9 @@ def continue_trace(
11801215
**optional_kwargs,
11811216
)
11821217

1218+
def set_propagation_context(self, environ_or_headers: "dict[str, Any]") -> None:
1219+
self.generate_propagation_context(environ_or_headers)
1220+
11831221
def capture_event(
11841222
self,
11851223
event: "Event",

0 commit comments

Comments
 (0)