diff --git a/src/agentex/lib/adk/_modules/tracing.py b/src/agentex/lib/adk/_modules/tracing.py index cb3b4b22b..afa3b4a0a 100644 --- a/src/agentex/lib/adk/_modules/tracing.py +++ b/src/agentex/lib/adk/_modules/tracing.py @@ -64,9 +64,7 @@ def _tracing_service(self) -> TracingService: # Re-create the underlying httpx client when the event loop changes # (e.g. between HTTP requests in a sync ASGI server) to avoid # "Event loop is closed" / "bound to a different event loop" errors. - if self._tracing_service_lazy is None or ( - loop_id is not None and loop_id != self._bound_loop_id - ): + if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id): import httpx # Disable keepalive so each span HTTP call gets a fresh TCP @@ -96,6 +94,7 @@ async def span( start_to_close_timeout: timedelta = timedelta(seconds=5), heartbeat_timeout: timedelta = timedelta(seconds=5), retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY, + fail_open: bool = False, ) -> AsyncGenerator[Span | None, None]: """ Async context manager for creating and automatically closing a span. @@ -103,6 +102,10 @@ async def span( If trace_id is falsy, acts as a no-op context manager. + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. + Args: trace_id (str): The trace ID for the span. name (str): The name of the span. @@ -112,6 +115,7 @@ async def span( start_to_close_timeout (timedelta): The start to close timeout for the span. heartbeat_timeout (timedelta): The heartbeat timeout for the span. retry_policy (RetryPolicy): The retry policy for the span. + fail_open (bool): If True, suppress tracing errors instead of raising. Returns: AsyncGenerator[Optional[Span], None]: An async generator that yields the started span object. @@ -120,27 +124,39 @@ async def span( yield None return - span: Span | None = await self.start_span( - trace_id=trace_id, - name=name, - input=input, - parent_id=parent_id, - data=data, - start_to_close_timeout=start_to_close_timeout, - heartbeat_timeout=heartbeat_timeout, - retry_policy=retry_policy, - ) + try: + span: Span | None = await self.start_span( + trace_id=trace_id, + name=name, + input=input, + parent_id=parent_id, + data=data, + start_to_close_timeout=start_to_close_timeout, + heartbeat_timeout=heartbeat_timeout, + retry_policy=retry_policy, + ) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: if span: - await self.end_span( - trace_id=trace_id, - span=span, - start_to_close_timeout=start_to_close_timeout, - heartbeat_timeout=heartbeat_timeout, - retry_policy=retry_policy, - ) + try: + await self.end_span( + trace_id=trace_id, + span=span, + start_to_close_timeout=start_to_close_timeout, + heartbeat_timeout=heartbeat_timeout, + retry_policy=retry_policy, + ) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) async def start_span( self, diff --git a/src/agentex/lib/core/tracing/trace.py b/src/agentex/lib/core/tracing/trace.py index 2ba1d489e..38bdf6b2a 100644 --- a/src/agentex/lib/core/tracing/trace.py +++ b/src/agentex/lib/core/tracing/trace.py @@ -146,19 +146,36 @@ def span( parent_id: str | None = None, input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, + fail_open: bool = False, ): """ Context manager for spans. If trace_id is falsy, acts as a no-op context manager. + + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. """ if not self.trace_id: yield None return - span = self.start_span(name, parent_id, input, data) + try: + span = self.start_span(name, parent_id, input, data) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: - self.end_span(span) + try: + self.end_span(span) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) class AsyncTrace: @@ -225,9 +242,7 @@ async def start_span( ) if self.processors: - await asyncio.gather( - *[processor.on_span_start(span) for processor in self.processors] - ) + await asyncio.gather(*[processor.on_span_start(span) for processor in self.processors]) return span @@ -252,9 +267,7 @@ async def end_span( span.data = recursive_model_dump(span.data) if span.data else None if self.processors: - await asyncio.gather( - *[processor.on_span_end(span) for processor in self.processors] - ) + await asyncio.gather(*[processor.on_span_end(span) for processor in self.processors]) return span @@ -290,24 +303,42 @@ async def span( parent_id: str | None = None, input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None, + fail_open: bool = False, ) -> AsyncGenerator[Span | None, None]: """ Context manager for spans. + When fail_open is True, tracing errors during span setup or teardown are + logged and suppressed instead of propagated. This ensures that tracing + infrastructure failures never interrupt the caller's business logic. + Args: name: Name of the span. parent_id: Optional parent span ID. input: Optional input data for the span. data: Optional additional data for the span. + fail_open: If True, suppress tracing errors instead of raising. Yields: - The span object. + The span object, or None if fail_open is True and setup failed. """ if not self.trace_id: yield None return - span = await self.start_span(name, parent_id, input, data) + try: + span = await self.start_span(name, parent_id, input, data) + except Exception: + if fail_open: + logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True) + yield None + return + raise try: yield span finally: - await self.end_span(span) + try: + await self.end_span(span) + except Exception: + if not fail_open: + raise + logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True) diff --git a/tests/lib/core/__init__.py b/tests/lib/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/__init__.py b/tests/lib/core/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/test_span_fail_open.py b/tests/lib/core/tracing/test_span_fail_open.py new file mode 100644 index 000000000..764bcdee1 --- /dev/null +++ b/tests/lib/core/tracing/test_span_fail_open.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from typing import Any +from unittest.mock import Mock, AsyncMock + +import pytest + +from agentex.lib.core.tracing.trace import Trace, AsyncTrace + + +class FakeSyncProcessor: + def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False): + self.fail_on_start = fail_on_start + self.fail_on_end = fail_on_end + self.started: list[Any] = [] + self.ended: list[Any] = [] + + def on_span_start(self, span: Any) -> None: + self.started.append(span) + if self.fail_on_start: + raise ConnectionError("tracing backend unavailable") + + def on_span_end(self, span: Any) -> None: + self.ended.append(span) + if self.fail_on_end: + raise ConnectionError("tracing backend unavailable") + + +class FakeAsyncProcessor: + def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False): + self.fail_on_start = fail_on_start + self.fail_on_end = fail_on_end + self.started: list[Any] = [] + self.ended: list[Any] = [] + + async def on_span_start(self, span: Any) -> None: + self.started.append(span) + if self.fail_on_start: + raise ConnectionError("tracing backend unavailable") + + async def on_span_end(self, span: Any) -> None: + self.ended.append(span) + if self.fail_on_end: + raise ConnectionError("tracing backend unavailable") + + +# --------------------------------------------------------------------------- +# Sync Trace tests +# --------------------------------------------------------------------------- + + +class TestSyncSpanFailOpen: + def _make_trace(self, processor: FakeSyncProcessor) -> Trace: + return Trace( + processors=[processor], # type: ignore[list-item] + client=Mock(), + trace_id="test-trace", + ) + + def test_default_propagates_start_error(self) -> None: + proc = FakeSyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + with trace.span(name="test"): + pass + + def test_default_propagates_end_error(self) -> None: + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + with trace.span(name="test"): + pass + + def test_fail_open_suppresses_start_error(self) -> None: + proc = FakeSyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with trace.span(name="test", fail_open=True) as span: + assert span is None + + def test_fail_open_suppresses_end_error(self) -> None: + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with trace.span(name="test", fail_open=True) as span: + assert span is not None + + def test_fail_open_does_not_swallow_body_exception(self) -> None: + """The critical property: exceptions from the caller's code must propagate.""" + proc = FakeSyncProcessor() + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + def test_fail_open_body_exception_with_end_error(self) -> None: + """Body exception propagates even when end_span also fails.""" + proc = FakeSyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + def test_happy_path_unchanged(self) -> None: + proc = FakeSyncProcessor() + trace = self._make_trace(proc) + with trace.span(name="test") as span: + assert span is not None + assert span.name == "test" + assert len(proc.started) == 1 + assert len(proc.ended) == 1 + + def test_no_trace_id_yields_none(self) -> None: + trace = Trace(processors=[], client=Mock(), trace_id=None) + with trace.span(name="test", fail_open=True) as span: + assert span is None + + +# --------------------------------------------------------------------------- +# Async Trace tests +# --------------------------------------------------------------------------- + + +class TestAsyncSpanFailOpen: + def _make_trace(self, processor: FakeAsyncProcessor) -> AsyncTrace: + return AsyncTrace( + processors=[processor], # type: ignore[list-item] + client=AsyncMock(), + trace_id="test-trace", + ) + + @pytest.mark.asyncio + async def test_default_propagates_start_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + async with trace.span(name="test"): + pass + + @pytest.mark.asyncio + async def test_default_propagates_end_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ConnectionError): + async with trace.span(name="test"): + pass + + @pytest.mark.asyncio + async def test_fail_open_suppresses_start_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_start=True) + trace = self._make_trace(proc) + async with trace.span(name="test", fail_open=True) as span: + assert span is None + + @pytest.mark.asyncio + async def test_fail_open_suppresses_end_error(self) -> None: + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + async with trace.span(name="test", fail_open=True) as span: + assert span is not None + + @pytest.mark.asyncio + async def test_fail_open_does_not_swallow_body_exception(self) -> None: + """The critical property: exceptions from the caller's code must propagate.""" + proc = FakeAsyncProcessor() + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + async with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + @pytest.mark.asyncio + async def test_fail_open_body_exception_with_end_error(self) -> None: + """Body exception propagates even when end_span also fails.""" + proc = FakeAsyncProcessor(fail_on_end=True) + trace = self._make_trace(proc) + with pytest.raises(ValueError, match="business logic error"): + async with trace.span(name="test", fail_open=True): + raise ValueError("business logic error") + + @pytest.mark.asyncio + async def test_happy_path_unchanged(self) -> None: + proc = FakeAsyncProcessor() + trace = self._make_trace(proc) + async with trace.span(name="test") as span: + assert span is not None + assert span.name == "test" + assert len(proc.started) == 1 + assert len(proc.ended) == 1 + + @pytest.mark.asyncio + async def test_no_trace_id_yields_none(self) -> None: + trace = AsyncTrace(processors=[], client=AsyncMock(), trace_id=None) + async with trace.span(name="test", fail_open=True) as span: + assert span is None