Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 36 additions & 20 deletions src/agentex/lib/adk/_modules/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def _tracing_service(self) -> TracingService:
# Re-create the underlying httpx client when the event loop changes
# (e.g. between HTTP requests in a sync ASGI server) to avoid
# "Event loop is closed" / "bound to a different event loop" errors.
if self._tracing_service_lazy is None or (
loop_id is not None and loop_id != self._bound_loop_id
):
if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id):
import httpx

# Disable keepalive so each span HTTP call gets a fresh TCP
Expand Down Expand Up @@ -96,13 +94,18 @@ async def span(
start_to_close_timeout: timedelta = timedelta(seconds=5),
heartbeat_timeout: timedelta = timedelta(seconds=5),
retry_policy: RetryPolicy = DEFAULT_RETRY_POLICY,
fail_open: bool = False,
) -> AsyncGenerator[Span | None, None]:
"""
Async context manager for creating and automatically closing a span.
Yields the started span object. The span is automatically ended when the context exits.

If trace_id is falsy, acts as a no-op context manager.

When fail_open is True, tracing errors during span setup or teardown are
logged and suppressed instead of propagated. This ensures that tracing
infrastructure failures never interrupt the caller's business logic.

Args:
trace_id (str): The trace ID for the span.
name (str): The name of the span.
Expand All @@ -112,6 +115,7 @@ async def span(
start_to_close_timeout (timedelta): The start to close timeout for the span.
heartbeat_timeout (timedelta): The heartbeat timeout for the span.
retry_policy (RetryPolicy): The retry policy for the span.
fail_open (bool): If True, suppress tracing errors instead of raising.

Returns:
AsyncGenerator[Optional[Span], None]: An async generator that yields the started span object.
Expand All @@ -120,27 +124,39 @@ async def span(
yield None
return

span: Span | None = await self.start_span(
trace_id=trace_id,
name=name,
input=input,
parent_id=parent_id,
data=data,
start_to_close_timeout=start_to_close_timeout,
heartbeat_timeout=heartbeat_timeout,
retry_policy=retry_policy,
)
try:
span: Span | None = await self.start_span(
trace_id=trace_id,
name=name,
input=input,
parent_id=parent_id,
data=data,
start_to_close_timeout=start_to_close_timeout,
heartbeat_timeout=heartbeat_timeout,
retry_policy=retry_policy,
)
except Exception:
if fail_open:
logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True)
yield None
return
raise
try:
yield span
finally:
if span:
await self.end_span(
trace_id=trace_id,
span=span,
start_to_close_timeout=start_to_close_timeout,
heartbeat_timeout=heartbeat_timeout,
retry_policy=retry_policy,
)
try:
await self.end_span(
trace_id=trace_id,
span=span,
start_to_close_timeout=start_to_close_timeout,
heartbeat_timeout=heartbeat_timeout,
retry_policy=retry_policy,
)
except Exception:
if not fail_open:
raise
logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True)

async def start_span(
self,
Expand Down
53 changes: 42 additions & 11 deletions src/agentex/lib/core/tracing/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,36 @@ def span(
parent_id: str | None = None,
input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None,
data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None,
fail_open: bool = False,
):
"""
Context manager for spans.
If trace_id is falsy, acts as a no-op context manager.

When fail_open is True, tracing errors during span setup or teardown are
logged and suppressed instead of propagated. This ensures that tracing
infrastructure failures never interrupt the caller's business logic.
"""
if not self.trace_id:
yield None
return
span = self.start_span(name, parent_id, input, data)
try:
span = self.start_span(name, parent_id, input, data)
except Exception:
if fail_open:
logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True)
yield None
return
raise
try:
yield span
finally:
self.end_span(span)
try:
self.end_span(span)
except Exception:
if not fail_open:
raise
logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True)


class AsyncTrace:
Expand Down Expand Up @@ -225,9 +242,7 @@ async def start_span(
)

if self.processors:
await asyncio.gather(
*[processor.on_span_start(span) for processor in self.processors]
)
await asyncio.gather(*[processor.on_span_start(span) for processor in self.processors])

return span

Expand All @@ -252,9 +267,7 @@ async def end_span(
span.data = recursive_model_dump(span.data) if span.data else None

if self.processors:
await asyncio.gather(
*[processor.on_span_end(span) for processor in self.processors]
)
await asyncio.gather(*[processor.on_span_end(span) for processor in self.processors])

return span

Expand Down Expand Up @@ -290,24 +303,42 @@ async def span(
parent_id: str | None = None,
input: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None,
data: dict[str, Any] | list[dict[str, Any]] | BaseModel | None = None,
fail_open: bool = False,
) -> AsyncGenerator[Span | None, None]:
"""
Context manager for spans.

When fail_open is True, tracing errors during span setup or teardown are
logged and suppressed instead of propagated. This ensures that tracing
infrastructure failures never interrupt the caller's business logic.

Args:
name: Name of the span.
parent_id: Optional parent span ID.
input: Optional input data for the span.
data: Optional additional data for the span.
fail_open: If True, suppress tracing errors instead of raising.

Yields:
The span object.
The span object, or None if fail_open is True and setup failed.
"""
if not self.trace_id:
yield None
return
span = await self.start_span(name, parent_id, input, data)
try:
span = await self.start_span(name, parent_id, input, data)
except Exception:
if fail_open:
logger.warning(f"Tracing span setup failed for '{name}', proceeding without tracing", exc_info=True)
yield None
return
raise
try:
yield span
finally:
await self.end_span(span)
try:
await self.end_span(span)
except Exception:
if not fail_open:
raise
logger.warning(f"Tracing span teardown failed for '{name}'", exc_info=True)
Empty file added tests/lib/core/__init__.py
Empty file.
Empty file.
192 changes: 192 additions & 0 deletions tests/lib/core/tracing/test_span_fail_open.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from __future__ import annotations

from typing import Any
from unittest.mock import Mock, AsyncMock

import pytest

from agentex.lib.core.tracing.trace import Trace, AsyncTrace


class FakeSyncProcessor:
def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False):
self.fail_on_start = fail_on_start
self.fail_on_end = fail_on_end
self.started: list[Any] = []
self.ended: list[Any] = []

def on_span_start(self, span: Any) -> None:
self.started.append(span)
if self.fail_on_start:
raise ConnectionError("tracing backend unavailable")

def on_span_end(self, span: Any) -> None:
self.ended.append(span)
if self.fail_on_end:
raise ConnectionError("tracing backend unavailable")


class FakeAsyncProcessor:
def __init__(self, *, fail_on_start: bool = False, fail_on_end: bool = False):
self.fail_on_start = fail_on_start
self.fail_on_end = fail_on_end
self.started: list[Any] = []
self.ended: list[Any] = []

async def on_span_start(self, span: Any) -> None:
self.started.append(span)
if self.fail_on_start:
raise ConnectionError("tracing backend unavailable")

async def on_span_end(self, span: Any) -> None:
self.ended.append(span)
if self.fail_on_end:
raise ConnectionError("tracing backend unavailable")


# ---------------------------------------------------------------------------
# Sync Trace tests
# ---------------------------------------------------------------------------


class TestSyncSpanFailOpen:
def _make_trace(self, processor: FakeSyncProcessor) -> Trace:
return Trace(
processors=[processor], # type: ignore[list-item]
client=Mock(),
trace_id="test-trace",
)

def test_default_propagates_start_error(self) -> None:
proc = FakeSyncProcessor(fail_on_start=True)
trace = self._make_trace(proc)
with pytest.raises(ConnectionError):
with trace.span(name="test"):
pass

def test_default_propagates_end_error(self) -> None:
proc = FakeSyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
with pytest.raises(ConnectionError):
with trace.span(name="test"):
pass

def test_fail_open_suppresses_start_error(self) -> None:
proc = FakeSyncProcessor(fail_on_start=True)
trace = self._make_trace(proc)
with trace.span(name="test", fail_open=True) as span:
assert span is None

def test_fail_open_suppresses_end_error(self) -> None:
proc = FakeSyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
with trace.span(name="test", fail_open=True) as span:
assert span is not None

def test_fail_open_does_not_swallow_body_exception(self) -> None:
"""The critical property: exceptions from the caller's code must propagate."""
proc = FakeSyncProcessor()
trace = self._make_trace(proc)
with pytest.raises(ValueError, match="business logic error"):
with trace.span(name="test", fail_open=True):
raise ValueError("business logic error")

def test_fail_open_body_exception_with_end_error(self) -> None:
"""Body exception propagates even when end_span also fails."""
proc = FakeSyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
with pytest.raises(ValueError, match="business logic error"):
with trace.span(name="test", fail_open=True):
raise ValueError("business logic error")

def test_happy_path_unchanged(self) -> None:
proc = FakeSyncProcessor()
trace = self._make_trace(proc)
with trace.span(name="test") as span:
assert span is not None
assert span.name == "test"
assert len(proc.started) == 1
assert len(proc.ended) == 1

def test_no_trace_id_yields_none(self) -> None:
trace = Trace(processors=[], client=Mock(), trace_id=None)
with trace.span(name="test", fail_open=True) as span:
assert span is None


# ---------------------------------------------------------------------------
# Async Trace tests
# ---------------------------------------------------------------------------


class TestAsyncSpanFailOpen:
def _make_trace(self, processor: FakeAsyncProcessor) -> AsyncTrace:
return AsyncTrace(
processors=[processor], # type: ignore[list-item]
client=AsyncMock(),
trace_id="test-trace",
)

@pytest.mark.asyncio
async def test_default_propagates_start_error(self) -> None:
proc = FakeAsyncProcessor(fail_on_start=True)
trace = self._make_trace(proc)
with pytest.raises(ConnectionError):
async with trace.span(name="test"):
pass

@pytest.mark.asyncio
async def test_default_propagates_end_error(self) -> None:
proc = FakeAsyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
with pytest.raises(ConnectionError):
async with trace.span(name="test"):
pass

@pytest.mark.asyncio
async def test_fail_open_suppresses_start_error(self) -> None:
proc = FakeAsyncProcessor(fail_on_start=True)
trace = self._make_trace(proc)
async with trace.span(name="test", fail_open=True) as span:
assert span is None

@pytest.mark.asyncio
async def test_fail_open_suppresses_end_error(self) -> None:
proc = FakeAsyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
async with trace.span(name="test", fail_open=True) as span:
assert span is not None

@pytest.mark.asyncio
async def test_fail_open_does_not_swallow_body_exception(self) -> None:
"""The critical property: exceptions from the caller's code must propagate."""
proc = FakeAsyncProcessor()
trace = self._make_trace(proc)
with pytest.raises(ValueError, match="business logic error"):
async with trace.span(name="test", fail_open=True):
raise ValueError("business logic error")

@pytest.mark.asyncio
async def test_fail_open_body_exception_with_end_error(self) -> None:
"""Body exception propagates even when end_span also fails."""
proc = FakeAsyncProcessor(fail_on_end=True)
trace = self._make_trace(proc)
with pytest.raises(ValueError, match="business logic error"):
async with trace.span(name="test", fail_open=True):
raise ValueError("business logic error")

@pytest.mark.asyncio
async def test_happy_path_unchanged(self) -> None:
proc = FakeAsyncProcessor()
trace = self._make_trace(proc)
async with trace.span(name="test") as span:
assert span is not None
assert span.name == "test"
assert len(proc.started) == 1
assert len(proc.ended) == 1

@pytest.mark.asyncio
async def test_no_trace_id_yields_none(self) -> None:
trace = AsyncTrace(processors=[], client=AsyncMock(), trace_id=None)
async with trace.span(name="test", fail_open=True) as span:
assert span is None
Loading