Skip to content

Commit 14dce08

Browse files
rustyconoverclaude
andcommitted
Add OTel spans for external data upload/fetch and fix input stats
- Instrument external storage upload and fetch with OpenTelemetry spans including URL sanitization to prevent credential leakage - Fix OTel context propagation to use clean root context per request - Exclude producer tick batches from input_batches call statistics Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 09528c6 commit 14dce08

7 files changed

Lines changed: 310 additions & 13 deletions

File tree

tests/test_call_statistics.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,9 @@ def test_producer_stream_stats(self, caplog: pytest.LogCaptureFixture) -> None:
308308
assert len(access_records) >= 1
309309
record = access_records[0]
310310
assert _extra(record, "status") == "ok"
311-
# 3 ticks + 1 final tick (for finish) = 4 input batches (ticks)
312-
assert _extra(record, "input_batches") >= 4 # params + ticks
311+
# Tick batches (producer protocol artifacts) are excluded from input stats;
312+
# only the initial params batch is counted.
313+
assert _extra(record, "input_batches") == 1 # params only, no ticks
313314
# 3 data batches from producer
314315
assert _extra(record, "output_batches") >= 3
315316

tests/test_otel.py

Lines changed: 214 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66
from __future__ import annotations
77

88
import threading
9+
from collections.abc import Iterator
10+
from contextlib import contextmanager
911
from dataclasses import dataclass
12+
from io import BytesIO
1013
from typing import Protocol, cast
1114

1215
import pyarrow as pa
1316
import pytest
17+
from aioresponses import aioresponses as aioresponses_ctx
1418
from opentelemetry.metrics import MeterProvider
1519
from opentelemetry.sdk.metrics import MeterProvider as SdkMeterProvider
1620
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
@@ -19,7 +23,16 @@
1923
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
2024
from opentelemetry.trace import SpanKind, StatusCode
2125
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
22-
26+
from pyarrow import ipc
27+
28+
from vgi_rpc.external import (
29+
ExternalLocationConfig,
30+
ExternalStorage,
31+
make_external_location_batch,
32+
maybe_externalize_batch,
33+
resolve_external_location,
34+
)
35+
from vgi_rpc.external_fetch import FetchConfig
2336
from vgi_rpc.http import http_connect, make_sync_client
2437
from vgi_rpc.otel import OtelConfig, instrument_server
2538
from vgi_rpc.rpc import (
@@ -1016,3 +1029,203 @@ def _bad_auth(req: object) -> AuthContext:
10161029
if attrs.get("deployment.environment") == "test":
10171030
found_env = True
10181031
assert found_env, "custom_attributes not found on auth failure counter"
1032+
1033+
1034+
# ---------------------------------------------------------------------------
1035+
# External data OTel span tests
1036+
# ---------------------------------------------------------------------------
1037+
1038+
_EXT_SCHEMA = pa.schema([pa.field("value", pa.int64())])
1039+
1040+
1041+
class _MockStorage(ExternalStorage):
1042+
"""In-memory ExternalStorage for OTel tests."""
1043+
1044+
def __init__(self) -> None:
1045+
"""Initialize with empty data store."""
1046+
self.data: dict[str, bytes] = {}
1047+
self._counter = 0
1048+
1049+
def upload(self, data: bytes, schema: pa.Schema, *, content_encoding: str | None = None) -> str:
1050+
"""Store data and return a mock URL."""
1051+
self._counter += 1
1052+
url = f"https://mock.storage/{self._counter}"
1053+
self.data[url] = data
1054+
return url
1055+
1056+
1057+
def _serialize_ipc(
1058+
schema: pa.Schema,
1059+
batches: list[tuple[pa.RecordBatch, pa.KeyValueMetadata | None]],
1060+
) -> bytes:
1061+
"""Serialize a list of (batch, custom_metadata) into IPC stream bytes."""
1062+
buf = BytesIO()
1063+
with ipc.new_stream(buf, schema) as writer:
1064+
for batch, cm in batches:
1065+
if cm is not None:
1066+
writer.write_batch(batch, custom_metadata=cm)
1067+
else:
1068+
writer.write_batch(batch)
1069+
return buf.getvalue()
1070+
1071+
1072+
@contextmanager
1073+
def _mock_aio(storage: _MockStorage) -> Iterator[aioresponses_ctx]:
1074+
"""Register all MockStorage URLs in aioresponses."""
1075+
with aioresponses_ctx() as mock:
1076+
for url, body in storage.data.items():
1077+
headers = {"Content-Length": str(len(body))}
1078+
mock.head(url, headers=headers)
1079+
mock.get(url, body=body, headers=headers)
1080+
yield mock
1081+
1082+
1083+
class TestExternalOtel:
1084+
"""OTel span tests for external data upload and fetch."""
1085+
1086+
@pytest.fixture(autouse=True)
1087+
def _patch_tracer(
1088+
self,
1089+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1090+
monkeypatch: pytest.MonkeyPatch,
1091+
) -> None:
1092+
"""Patch external.py to use the test TracerProvider."""
1093+
import vgi_rpc.external as _ext_mod
1094+
1095+
tracer_provider = otel_providers[0]
1096+
1097+
class _PatchedTrace:
1098+
"""Minimal trace module proxy that uses the test TracerProvider."""
1099+
1100+
SpanKind = SpanKind
1101+
StatusCode = StatusCode
1102+
1103+
@staticmethod
1104+
def get_tracer(name: str, version: str = "") -> object:
1105+
"""Return a tracer from the test provider."""
1106+
return tracer_provider.get_tracer(name, version)
1107+
1108+
monkeypatch.setattr(_ext_mod, "_otel_trace", _PatchedTrace)
1109+
monkeypatch.setattr(_ext_mod, "_HAS_OTEL", True)
1110+
1111+
def test_upload_span(
1112+
self,
1113+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1114+
) -> None:
1115+
"""Upload creates a span with correct attributes."""
1116+
_, _, exporter, _ = otel_providers
1117+
storage = _MockStorage()
1118+
config = ExternalLocationConfig(storage=storage, externalize_threshold_bytes=1)
1119+
1120+
batch = pa.RecordBatch.from_pydict({"value": [42]}, schema=_EXT_SCHEMA)
1121+
maybe_externalize_batch(batch, None, config)
1122+
1123+
spans = exporter.get_finished_spans()
1124+
upload_spans = [s for s in spans if s.name == "vgi_rpc.external/upload"]
1125+
assert len(upload_spans) == 1
1126+
span = upload_spans[0]
1127+
assert span.kind == SpanKind.CLIENT
1128+
attrs = dict(span.attributes or {})
1129+
assert attrs["url"] == "https://mock.storage/1"
1130+
assert isinstance(attrs["upload_bytes"], int) and attrs["upload_bytes"] > 0
1131+
assert "compression" not in attrs
1132+
1133+
def test_upload_span_with_compression(
1134+
self,
1135+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1136+
) -> None:
1137+
"""Upload span includes compression and original_bytes when compressed."""
1138+
_, _, exporter, _ = otel_providers
1139+
from vgi_rpc.external import Compression
1140+
1141+
storage = _MockStorage()
1142+
config = ExternalLocationConfig(
1143+
storage=storage,
1144+
externalize_threshold_bytes=1,
1145+
compression=Compression(),
1146+
)
1147+
1148+
batch = pa.RecordBatch.from_pydict({"value": [42]}, schema=_EXT_SCHEMA)
1149+
maybe_externalize_batch(batch, None, config)
1150+
1151+
spans = exporter.get_finished_spans()
1152+
upload_spans = [s for s in spans if s.name == "vgi_rpc.external/upload"]
1153+
assert len(upload_spans) == 1
1154+
attrs = dict(upload_spans[0].attributes or {})
1155+
assert attrs["compression"] == "zstd"
1156+
assert isinstance(attrs["original_bytes"], int) and attrs["original_bytes"] > 0
1157+
1158+
def test_fetch_span(
1159+
self,
1160+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1161+
) -> None:
1162+
"""Fetch creates a span with correct attributes."""
1163+
_, _, exporter, _ = otel_providers
1164+
storage = _MockStorage()
1165+
data_batch = pa.RecordBatch.from_pydict({"value": [99]}, schema=_EXT_SCHEMA)
1166+
ipc_bytes = _serialize_ipc(_EXT_SCHEMA, [(data_batch, None)])
1167+
url = "https://mock.storage/fetch1"
1168+
storage.data[url] = ipc_bytes
1169+
1170+
pointer, cm = make_external_location_batch(_EXT_SCHEMA, url)
1171+
config = ExternalLocationConfig(
1172+
storage=storage,
1173+
url_validator=None,
1174+
fetch_config=FetchConfig(parallel_threshold_bytes=len(ipc_bytes) + 1000),
1175+
)
1176+
1177+
with _mock_aio(storage):
1178+
resolved, _ = resolve_external_location(pointer, cm, config)
1179+
1180+
assert resolved.num_rows == 1
1181+
spans = exporter.get_finished_spans()
1182+
fetch_spans = [s for s in spans if s.name == "vgi_rpc.external/fetch"]
1183+
assert len(fetch_spans) == 1
1184+
span = fetch_spans[0]
1185+
assert span.kind == SpanKind.CLIENT
1186+
assert span.status.status_code == StatusCode.OK
1187+
attrs = dict(span.attributes or {})
1188+
assert attrs["url"] == "https://mock.storage/fetch1"
1189+
assert isinstance(attrs["fetch_duration_ms"], float) and attrs["fetch_duration_ms"] >= 0
1190+
1191+
def test_fetch_span_error(
1192+
self,
1193+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1194+
) -> None:
1195+
"""Fetch span records error status on failure."""
1196+
_, _, exporter, _ = otel_providers
1197+
url = "https://mock.storage/fail"
1198+
pointer, cm = make_external_location_batch(_EXT_SCHEMA, url)
1199+
config = ExternalLocationConfig(
1200+
url_validator=None,
1201+
max_retries=0,
1202+
fetch_config=FetchConfig(parallel_threshold_bytes=999999),
1203+
)
1204+
1205+
with aioresponses_ctx() as mock:
1206+
mock.head(url, exception=OSError("connection refused"))
1207+
mock.get(url, exception=OSError("connection refused"))
1208+
with pytest.raises(RuntimeError, match="Failed to resolve"):
1209+
resolve_external_location(pointer, cm, config)
1210+
1211+
spans = exporter.get_finished_spans()
1212+
fetch_spans = [s for s in spans if s.name == "vgi_rpc.external/fetch"]
1213+
assert len(fetch_spans) == 1
1214+
span = fetch_spans[0]
1215+
assert span.status.status_code == StatusCode.ERROR
1216+
event_names = [e.name for e in span.events]
1217+
assert "exception" in event_names
1218+
1219+
def test_upload_span_sanitizes_url(
1220+
self,
1221+
otel_providers: tuple[TracerProvider, SdkMeterProvider, InMemorySpanExporter, InMemoryMetricReader],
1222+
) -> None:
1223+
"""Upload span strips query params from URL to avoid credential leakage."""
1224+
from vgi_rpc.external import _sanitize_url
1225+
1226+
assert _sanitize_url("https://bucket.s3.amazonaws.com/key?X-Amz-Credential=AKIA") == (
1227+
"https://bucket.s3.amazonaws.com/key"
1228+
)
1229+
assert _sanitize_url("https://storage.googleapis.com/b/o?token=secret") == (
1230+
"https://storage.googleapis.com/b/o"
1231+
)

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)