From e6800397681e620733224779fdfda3408c975429 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Wed, 18 Mar 2026 14:22:39 -0700 Subject: [PATCH 1/2] Add fail_open parameter to Trace.span() and AsyncTrace.span() When fail_open=True, tracing errors during span setup or teardown are logged and suppressed instead of propagated. This ensures that tracing infrastructure failures never interrupt the caller's business logic. Without this, callers who want fault-tolerant tracing are forced to manually decompose the context manager protocol (__aenter__/__aexit__), which is error-prone and can silently swallow unrelated exceptions from the body of the with-block. --- src/agentex/lib/adk/_modules/tracing.py | 56 ++++++++++++++++--------- src/agentex/lib/core/tracing/trace.py | 53 ++++++++++++++++++----- 2 files changed, 78 insertions(+), 31 deletions(-) diff --git a/src/agentex/lib/adk/_modules/tracing.py b/src/agentex/lib/adk/_modules/tracing.py index cb3b4b22b..afa3b4a0a 100644 --- a/src/agentex/lib/adk/_modules/tracing.py +++ b/src/agentex/lib/adk/_modules/tracing.py @@ -64,9 +64,7 @@ def _tracing_service(self) -> TracingService: # Re-create the underlying httpx client when the event loop changes # (e.g. between HTTP requests in a sync ASGI server) to avoid # "Event loop is closed" / "bound to a different event loop" errors. - if self._tracing_service_lazy is None or ( - loop_id is not None and loop_id != self._bound_loop_id - ): + if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id): import httpx # Disable keepalive so each span HTTP call gets a fresh TCP @@ -96,6 +94,7 @@ async def span( start_to_close_timeout: timedelta = timedelta(seconds=5), heartbeat_timeout: timedelta = timedelta(seconds=5), retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY, + fail_open: bool = False, ) -> AsyncGenerator[Span | None, None]: """ Async context manager for creating and automatically closing a span. @@ -103,6 +102,10 @@ async def span( If trace_id is falsy, acts as a no-op context manager. + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. + Args: trace_id (str): The trace ID for the span. name (str): The name of the span. @@ -112,6 +115,7 @@ async def span( start_to_close_timeout (timedelta): The start to close timeout for the span. heartbeat_timeout (timedelta): The heartbeat timeout for the span. retry_policy (RetryPolicy): The retry policy for the span. + fail_open (bool): If True, suppress tracing errors instead of raising. Returns: AsyncGenerator[Optional[Span], None]: An async generator that yields the started span object. @@ -120,27 +124,39 @@ async def span( yield None return - span: Span | None = await self.start_span( - trace_id=trace_id, - name=name, - input=input, - parent_id=parent_id, - data=data, - start_to_close_timeout=start_to_close_timeout, - heartbeat_timeout=heartbeat_timeout, - retry_policy=retry_policy, - ) + try: + span: Span | None = await self.start_span( + trace_id=trace_id, + name=name, + input=input, + parent_id=parent_id, + data=data, + start_to_close_timeout=start_to_close_timeout, + heartbeat_timeout=heartbeat_timeout, + retry_policy=retry_policy, + ) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: if span: - await self.end_span( - trace_id=trace_id, - span=span, - start_to_close_timeout=start_to_close_timeout, - heartbeat_timeout=heartbeat_timeout, - retry_policy=retry_policy, - ) + try: + await self.end_span( + trace_id=trace_id, + span=span, + start_to_close_timeout=start_to_close_timeout, + heartbeat_timeout=heartbeat_timeout, + retry_policy=retry_policy, + ) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) async def start_span( self, diff --git a/src/agentex/lib/core/tracing/trace.py b/src/agentex/lib/core/tracing/trace.py index 2ba1d489e..38bdf6b2a 100644 --- a/src/agentex/lib/core/tracing/trace.py +++ b/src/agentex/lib/core/tracing/trace.py @@ -146,19 +146,36 @@ def span( parent_id: str | None = None, input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, + fail_open: bool = False, ): """ Context manager for spans. If trace_id is falsy, acts as a no-op context manager. + + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. """ if not self.trace_id: yield None return - span = self.start_span(name, parent_id, input, data) + try: + span = self.start_span(name, parent_id, input, data) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: - self.end_span(span) + try: + self.end_span(span) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) class AsyncTrace: @@ -225,9 +242,7 @@ async def start_span( ) if self.processors: - await asyncio.gather( - *[processor.on_span_start(span) for processor in self.processors] - ) + await asyncio.gather(*[processor.on_span_start(span) for processor in self.processors]) return span @@ -252,9 +267,7 @@ async def end_span( span.data = recursive_model_dump(span.data) if span.data else None if self.processors: - await asyncio.gather( - *[processor.on_span_end(span) for processor in self.processors] - ) + await asyncio.gather(*[processor.on_span_end(span) for processor in self.processors]) return span @@ -290,24 +303,42 @@ async def span( parent_id: str | None = None, input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, + fail_open: bool = False, ) -> AsyncGenerator[Span | None, None]: """ Context manager for spans. + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. + Args: name: Name of the span. parent_id: Optional parent span ID. input: Optional input data for the span. data: Optional additional data for the span. + fail_open: If True, suppress tracing errors instead of raising. Yields: - The span object. + The span object, or None if fail_open is True and setup failed. """ if not self.trace_id: yield None return - span = await self.start_span(name, parent_id, input, data) + try: + span = await self.start_span(name, parent_id, input, data) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: - await self.end_span(span) + try: + await self.end_span(span) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) From d36f1647247163ab3331fd77b0cf1ade901f6ac5 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Wed, 18 Mar 2026 14:22:44 -0700 Subject: [PATCH 2/2] Add tests for span fail_open behavior Tests cover both sync and async variants: setup failures, teardown failures, body exception propagation, and the critical property that fail_open never swallows exceptions from the caller's code. --- tests/lib/core/__init__.py | 0 tests/lib/core/tracing/__init__.py | 0 tests/lib/core/tracing/test_span_fail_open.py | 192 ++++++++++++++++++ 3 files changed, 192 insertions(+) create mode 100644 tests/lib/core/__init__.py create mode 100644 tests/lib/core/tracing/__init__.py create mode 100644 tests/lib/core/tracing/test_span_fail_open.py diff --git a/tests/lib/core/__init__.py b/tests/lib/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/__init__.py b/tests/lib/core/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/test_span_fail_open.py b/tests/lib/core/tracing/test_span_fail_open.py new file mode 100644 index 000000000..764bcdee1 --- /dev/null +++ b/tests/lib/core/tracing/test_span_fail_open.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from typing import Any +from unittest.mock import Mock, AsyncMock + +import pytest + +from agentex.lib.core.tracing.trace import Trace, AsyncTrace + + +class FakeSyncProcessor: + def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False): + self.fail_on_start = fail_on_start + self.fail_on_end = fail_on_end + self.started: list[Any] = [] + self.ended: list[Any] = [] + + def on_span_start(self, span: Any) -> None: + self.started.append(span) + if self.fail_on_start: + raise ConnectionError("tracing backend unavailable") + + def on_span_end(self, span: Any) -> None: + self.ended.append(span) + if self.fail_on_end: + raise ConnectionError("tracing backend unavailable") + + +class FakeAsyncProcessor: + def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False): + self.fail_on_start = fail_on_start + self.fail_on_end = fail_on_end + self.started: list[Any] = [] + self.ended: list[Any] = [] + + async def on_span_start(self, span: Any) -> None: + self.started.append(span) + if self.fail_on_start: + raise ConnectionError("tracing backend unavailable") + + async def on_span_end(self, span: Any) -> None: + self.ended.append(span) + if self.fail_on_end: + raise ConnectionError("tracing backend unavailable") + + +# --------------------------------------------------------------------------- +# Sync Trace tests +# --------------------------------------------------------------------------- + + +class TestSyncSpanFailOpen: + def _make_trace(self, processor: FakeSyncProcessor) -> Trace: + return Trace( + processors=[processor], # type: ignore[list-item] + client=Mock(), + trace_id="test-trace", + ) + + def test_default_propagates_start_error(self) -> None: + proc = FakeSyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + with trace.span(name="test"): + pass + + def test_default_propagates_end_error(self) -> None: + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + with trace.span(name="test"): + pass + + def test_fail_open_suppresses_start_error(self) -> None: + proc = FakeSyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with trace.span(name="test", fail_open=True) as span: + assert span is None + + def test_fail_open_suppresses_end_error(self) -> None: + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with trace.span(name="test", fail_open=True) as span: + assert span is not None + + def test_fail_open_does_not_swallow_body_exception(self) -> None: + """The critical property: exceptions from the caller's code must propagate.""" + proc = FakeSyncProcessor() + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + def test_fail_open_body_exception_with_end_error(self) -> None: + """Body exception propagates even when end_span also fails.""" + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + def test_happy_path_unchanged(self) -> None: + proc = FakeSyncProcessor() + trace = self._make_trace(proc) + with trace.span(name="test") as span: + assert span is not None + assert span.name == "test" + assert len(proc.started) == 1 + assert len(proc.ended) == 1 + + def test_no_trace_id_yields_none(self) -> None: + trace = Trace(processors=[], client=Mock(), trace_id=None) + with trace.span(name="test", fail_open=True) as span: + assert span is None + + +# --------------------------------------------------------------------------- +# Async Trace tests +# --------------------------------------------------------------------------- + + +class TestAsyncSpanFailOpen: + def _make_trace(self, processor: FakeAsyncProcessor) -> AsyncTrace: + return AsyncTrace( + processors=[processor], # type: ignore[list-item] + client=AsyncMock(), + trace_id="test-trace", + ) + + @pytest.mark.asyncio + async def test_default_propagates_start_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + async with trace.span(name="test"): + pass + + @pytest.mark.asyncio + async def test_default_propagates_end_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + async with trace.span(name="test"): + pass + + @pytest.mark.asyncio + async def test_fail_open_suppresses_start_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + async with trace.span(name="test", fail_open=True) as span: + assert span is None + + @pytest.mark.asyncio + async def test_fail_open_suppresses_end_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + async with trace.span(name="test", fail_open=True) as span: + assert span is not None + + @pytest.mark.asyncio + async def test_fail_open_does_not_swallow_body_exception(self) -> None: + """The critical property: exceptions from the caller's code must propagate.""" + proc = FakeAsyncProcessor() + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + async with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + @pytest.mark.asyncio + async def test_fail_open_body_exception_with_end_error(self) -> None: + """Body exception propagates even when end_span also fails.""" + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + async with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + @pytest.mark.asyncio + async def test_happy_path_unchanged(self) -> None: + proc = FakeAsyncProcessor() + trace = self._make_trace(proc) + async with trace.span(name="test") as span: + assert span is not None + assert span.name == "test" + assert len(proc.started) == 1 + assert len(proc.ended) == 1 + + @pytest.mark.asyncio + async def test_no_trace_id_yields_none(self) -> None: + trace = AsyncTrace(processors=[], client=AsyncMock(), trace_id=None) + async with trace.span(name="test", fail_open=True) as span: + assert span is None