Skip to content

Commit 6039305

Browse files
authored
ref: Deduplicate batchers (#5263)
`LogBatcher` and `MetricBatcher` were essentially copies of each other, with much more in common than not. Create a parent `Batcher` class and keep the specific batchers minimal. They'll eventually get replaced by the telemetry processor, but we're stuck with the batchers for a bit longer, so might as well make them nicer to work with. Also, span first will build on this.
1 parent f2317dc commit 6039305

File tree

3 files changed

+190
-259
lines changed

3 files changed

+190
-259
lines changed

sentry_sdk/_batcher.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import TYPE_CHECKING, TypeVar, Generic
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
10+
if TYPE_CHECKING:
11+
from typing import Optional, Callable, Any
12+
13+
T = TypeVar("T")
14+
15+
16+
class Batcher(Generic[T]):
17+
MAX_BEFORE_FLUSH = 100
18+
MAX_BEFORE_DROP = 1_000
19+
FLUSH_WAIT_TIME = 5.0
20+
21+
TYPE = ""
22+
CONTENT_TYPE = ""
23+
24+
def __init__(
25+
self,
26+
capture_func: "Callable[[Envelope], None]",
27+
record_lost_func: "Callable[..., None]",
28+
) -> None:
29+
self._buffer: "list[T]" = []
30+
self._capture_func = capture_func
31+
self._record_lost_func = record_lost_func
32+
self._running = True
33+
self._lock = threading.Lock()
34+
35+
self._flush_event: "threading.Event" = threading.Event()
36+
37+
self._flusher: "Optional[threading.Thread]" = None
38+
self._flusher_pid: "Optional[int]" = None
39+
40+
def _ensure_thread(self) -> bool:
41+
"""For forking processes we might need to restart this thread.
42+
This ensures that our process actually has that thread running.
43+
"""
44+
if not self._running:
45+
return False
46+
47+
pid = os.getpid()
48+
if self._flusher_pid == pid:
49+
return True
50+
51+
with self._lock:
52+
# Recheck to make sure another thread didn't get here and start the
53+
# the flusher in the meantime
54+
if self._flusher_pid == pid:
55+
return True
56+
57+
self._flusher_pid = pid
58+
59+
self._flusher = threading.Thread(target=self._flush_loop)
60+
self._flusher.daemon = True
61+
62+
try:
63+
self._flusher.start()
64+
except RuntimeError:
65+
# Unfortunately at this point the interpreter is in a state that no
66+
# longer allows us to spawn a thread and we have to bail.
67+
self._running = False
68+
return False
69+
70+
return True
71+
72+
def _flush_loop(self) -> None:
73+
while self._running:
74+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
75+
self._flush_event.clear()
76+
self._flush()
77+
78+
def add(self, item: "T") -> None:
79+
if not self._ensure_thread() or self._flusher is None:
80+
return None
81+
82+
with self._lock:
83+
if len(self._buffer) >= self.MAX_BEFORE_DROP:
84+
self._record_lost(item)
85+
return None
86+
87+
self._buffer.append(item)
88+
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
89+
self._flush_event.set()
90+
91+
def kill(self) -> None:
92+
if self._flusher is None:
93+
return
94+
95+
self._running = False
96+
self._flush_event.set()
97+
self._flusher = None
98+
99+
def flush(self) -> None:
100+
self._flush()
101+
102+
def _add_to_envelope(self, envelope: "Envelope") -> None:
103+
envelope.add_item(
104+
Item(
105+
type=self.TYPE,
106+
content_type=self.CONTENT_TYPE,
107+
headers={
108+
"item_count": len(self._buffer),
109+
},
110+
payload=PayloadRef(
111+
json={
112+
"items": [
113+
self._to_transport_format(item) for item in self._buffer
114+
]
115+
}
116+
),
117+
)
118+
)
119+
120+
def _flush(self) -> "Optional[Envelope]":
121+
envelope = Envelope(
122+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
123+
)
124+
with self._lock:
125+
if len(self._buffer) == 0:
126+
return None
127+
128+
self._add_to_envelope(envelope)
129+
self._buffer.clear()
130+
131+
self._capture_func(envelope)
132+
return envelope
133+
134+
def _record_lost(self, item: "T") -> None:
135+
pass
136+
137+
@staticmethod
138+
def _to_transport_format(item: "T") -> "Any":
139+
pass

sentry_sdk/_log_batcher.py

Lines changed: 35 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1,164 +1,56 @@
1-
import os
2-
import random
3-
import threading
4-
from datetime import datetime, timezone
5-
from typing import Optional, List, Callable, TYPE_CHECKING, Any
1+
from typing import TYPE_CHECKING
62

7-
from sentry_sdk.utils import format_timestamp, safe_repr, serialize_attribute
3+
from sentry_sdk._batcher import Batcher
4+
from sentry_sdk.utils import serialize_attribute
85
from sentry_sdk.envelope import Envelope, Item, PayloadRef
96

107
if TYPE_CHECKING:
8+
from typing import Any
119
from sentry_sdk._types import Log
1210

1311

14-
class LogBatcher:
15-
MAX_LOGS_BEFORE_FLUSH = 100
16-
MAX_LOGS_BEFORE_DROP = 1_000
12+
class LogBatcher(Batcher["Log"]):
13+
MAX_BEFORE_FLUSH = 100
14+
MAX_BEFORE_DROP = 1_000
1715
FLUSH_WAIT_TIME = 5.0
1816

19-
def __init__(
20-
self,
21-
capture_func: "Callable[[Envelope], None]",
22-
record_lost_func: "Callable[..., None]",
23-
) -> None:
24-
self._log_buffer: "List[Log]" = []
25-
self._capture_func = capture_func
26-
self._record_lost_func = record_lost_func
27-
self._running = True
28-
self._lock = threading.Lock()
29-
30-
self._flush_event: "threading.Event" = threading.Event()
31-
32-
self._flusher: "Optional[threading.Thread]" = None
33-
self._flusher_pid: "Optional[int]" = None
34-
35-
def _ensure_thread(self) -> bool:
36-
"""For forking processes we might need to restart this thread.
37-
This ensures that our process actually has that thread running.
38-
"""
39-
if not self._running:
40-
return False
41-
42-
pid = os.getpid()
43-
if self._flusher_pid == pid:
44-
return True
45-
46-
with self._lock:
47-
# Recheck to make sure another thread didn't get here and start the
48-
# the flusher in the meantime
49-
if self._flusher_pid == pid:
50-
return True
51-
52-
self._flusher_pid = pid
53-
54-
self._flusher = threading.Thread(target=self._flush_loop)
55-
self._flusher.daemon = True
56-
57-
try:
58-
self._flusher.start()
59-
except RuntimeError:
60-
# Unfortunately at this point the interpreter is in a state that no
61-
# longer allows us to spawn a thread and we have to bail.
62-
self._running = False
63-
return False
64-
65-
return True
66-
67-
def _flush_loop(self) -> None:
68-
while self._running:
69-
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
70-
self._flush_event.clear()
71-
self._flush()
72-
73-
def add(
74-
self,
75-
log: "Log",
76-
) -> None:
77-
if not self._ensure_thread() or self._flusher is None:
78-
return None
79-
80-
with self._lock:
81-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_DROP:
82-
# Construct log envelope item without sending it to report lost bytes
83-
log_item = Item(
84-
type="log",
85-
content_type="application/vnd.sentry.items.log+json",
86-
headers={
87-
"item_count": 1,
88-
},
89-
payload=PayloadRef(
90-
json={"items": [LogBatcher._log_to_transport_format(log)]}
91-
),
92-
)
93-
self._record_lost_func(
94-
reason="queue_overflow",
95-
data_category="log_item",
96-
item=log_item,
97-
quantity=1,
98-
)
99-
return None
100-
101-
self._log_buffer.append(log)
102-
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
103-
self._flush_event.set()
104-
105-
def kill(self) -> None:
106-
if self._flusher is None:
107-
return
108-
109-
self._running = False
110-
self._flush_event.set()
111-
self._flusher = None
112-
113-
def flush(self) -> None:
114-
self._flush()
17+
TYPE = "log"
18+
CONTENT_TYPE = "application/vnd.sentry.items.log+json"
11519

11620
@staticmethod
117-
def _log_to_transport_format(log: "Log") -> "Any":
118-
if "sentry.severity_number" not in log["attributes"]:
119-
log["attributes"]["sentry.severity_number"] = log["severity_number"]
120-
if "sentry.severity_text" not in log["attributes"]:
121-
log["attributes"]["sentry.severity_text"] = log["severity_text"]
21+
def _to_transport_format(item: "Log") -> "Any":
22+
if "sentry.severity_number" not in item["attributes"]:
23+
item["attributes"]["sentry.severity_number"] = item["severity_number"]
24+
if "sentry.severity_text" not in item["attributes"]:
25+
item["attributes"]["sentry.severity_text"] = item["severity_text"]
12226

12327
res = {
124-
"timestamp": int(log["time_unix_nano"]) / 1.0e9,
125-
"trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"),
126-
"span_id": log.get("span_id"),
127-
"level": str(log["severity_text"]),
128-
"body": str(log["body"]),
28+
"timestamp": int(item["time_unix_nano"]) / 1.0e9,
29+
"trace_id": item.get("trace_id", "00000000-0000-0000-0000-000000000000"),
30+
"span_id": item.get("span_id"),
31+
"level": str(item["severity_text"]),
32+
"body": str(item["body"]),
12933
"attributes": {
130-
k: serialize_attribute(v) for (k, v) in log["attributes"].items()
34+
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
13135
},
13236
}
13337

13438
return res
13539

136-
def _flush(self) -> "Optional[Envelope]":
137-
envelope = Envelope(
138-
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
40+
def _record_lost(self, item: "Log") -> None:
41+
# Construct log envelope item without sending it to report lost bytes
42+
log_item = Item(
43+
type=self.TYPE,
44+
content_type=self.CONTENT_TYPE,
45+
headers={
46+
"item_count": 1,
47+
},
48+
payload=PayloadRef(json={"items": [self._to_transport_format(item)]}),
13949
)
140-
with self._lock:
141-
if len(self._log_buffer) == 0:
142-
return None
14350

144-
envelope.add_item(
145-
Item(
146-
type="log",
147-
content_type="application/vnd.sentry.items.log+json",
148-
headers={
149-
"item_count": len(self._log_buffer),
150-
},
151-
payload=PayloadRef(
152-
json={
153-
"items": [
154-
self._log_to_transport_format(log)
155-
for log in self._log_buffer
156-
]
157-
}
158-
),
159-
)
160-
)
161-
self._log_buffer.clear()
162-
163-
self._capture_func(envelope)
164-
return envelope
51+
self._record_lost_func(
52+
reason="queue_overflow",
53+
data_category="log_item",
54+
item=log_item,
55+
quantity=1,
56+
)

0 commit comments

Comments
 (0)