Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
add_trace_processor,
agent_span,
custom_span,
flush_traces,
function_span,
gen_span_id,
gen_trace_id,
Expand Down Expand Up @@ -451,6 +452,7 @@ def enable_verbose_stdout_logging():
"add_trace_processor",
"agent_span",
"custom_span",
"flush_traces",
"function_span",
"generation_span",
"get_current_span",
Expand Down
12 changes: 12 additions & 0 deletions src/agents/tracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"add_trace_processor",
"agent_span",
"custom_span",
"flush_traces",
"function_span",
"generation_span",
"get_current_span",
Expand Down Expand Up @@ -108,3 +109,14 @@ def set_tracing_export_api_key(api_key: str) -> None:
Set the OpenAI API key for the backend exporter.
"""
default_exporter().set_api_key(api_key)


def flush_traces() -> None:
"""Force immediate export of buffered traces and spans.

The default ``BatchTraceProcessor`` already exports traces periodically in the
background. Call this when a worker, background job, or request handler needs
traces to be visible immediately after a unit of work finishes instead of
waiting for the next scheduled flush.
"""
get_trace_provider().force_flush()
34 changes: 18 additions & 16 deletions src/agents/tracing/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ def __init__(
# We lazily start the background worker thread the first time a span/trace is queued.
self._worker_thread: threading.Thread | None = None
self._thread_start_lock = threading.Lock()
self._export_lock = threading.Lock()

def _ensure_thread_started(self) -> None:
# Fast path without holding the lock
Expand Down Expand Up @@ -571,25 +572,26 @@ def _export_batches(self, force: bool = False):
"""Drains the queue and exports in batches. If force=True, export everything.
Otherwise, export up to `max_batch_size` repeatedly until the queue is completely empty.
"""
while True:
items_to_export: list[Span[Any] | Trace] = []
with self._export_lock:
while True:
items_to_export: list[Span[Any] | Trace] = []

# Gather a batch of spans up to max_batch_size
while not self._queue.empty() and (
force or len(items_to_export) < self._max_batch_size
):
try:
items_to_export.append(self._queue.get_nowait())
except queue.Empty:
# Another thread might have emptied the queue between checks
break

# Gather a batch of spans up to max_batch_size
while not self._queue.empty() and (
force or len(items_to_export) < self._max_batch_size
):
try:
items_to_export.append(self._queue.get_nowait())
except queue.Empty:
# Another thread might have emptied the queue between checks
# If we collected nothing, we're done
if not items_to_export:
break

# If we collected nothing, we're done
if not items_to_export:
break

# Export the batch
self._exporter.export(items_to_export)
# Export the batch
self._exporter.export(items_to_export)


# Lazily initialized defaults to avoid creating network clients or threading
Expand Down
28 changes: 26 additions & 2 deletions src/agents/tracing/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,21 @@ def create_span(
) -> Span[TSpanData]:
"""Create a new span."""

@abstractmethod
def force_flush(self) -> None:
"""Force all registered processors to flush buffered traces/spans immediately.

The default implementation is a no-op so existing custom ``TraceProvider``
implementations continue to work without adding this method.
"""
return None

def shutdown(self) -> None:
"""Clean up any resources used by the provider."""
"""Clean up any resources used by the provider.

The default implementation is a no-op so existing custom ``TraceProvider``
implementations continue to work without adding this method.
"""
return None


class DefaultTraceProvider(TraceProvider):
Expand Down Expand Up @@ -365,7 +377,19 @@ def create_span(
trace_metadata=trace_metadata,
)

def force_flush(self) -> None:
"""Force all processors to flush their buffers immediately."""
self._refresh_disabled_flag()
if self._disabled:
return

try:
self._multi_processor.force_flush()
except Exception as e:
logger.error(f"Error flushing trace provider: {e}")

def shutdown(self) -> None:
self._refresh_disabled_flag()
if self._disabled:
return

Expand Down
131 changes: 128 additions & 3 deletions tests/test_trace_processor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import os
import threading
import time
from typing import Any, cast
from unittest.mock import MagicMock, patch

import httpx
import pytest

from agents.tracing.processor_interface import TracingProcessor
from agents.tracing import flush_traces, get_trace_provider
from agents.tracing.processor_interface import TracingExporter, TracingProcessor
from agents.tracing.processors import BackendSpanExporter, BatchTraceProcessor
from agents.tracing.provider import DefaultTraceProvider, TraceProvider
from agents.tracing.span_data import AgentSpanData
from agents.tracing.spans import SpanImpl
from agents.tracing.traces import TraceImpl
from agents.tracing.spans import Span, SpanImpl
from agents.tracing.traces import Trace, TraceImpl


def get_span(processor: TracingProcessor) -> SpanImpl[AgentSpanData]:
Expand Down Expand Up @@ -123,6 +126,34 @@ def test_batch_trace_processor_force_flush(mocked_exporter):
processor.shutdown()


def test_batch_trace_processor_force_flush_waits_for_in_flight_background_export():
export_started = threading.Event()
export_continue = threading.Event()

class BlockingExporter(TracingExporter):
def export(self, items: list[Trace | Span[Any]]) -> None:
export_started.set()
assert export_continue.wait(timeout=2.0)

processor = BatchTraceProcessor(exporter=BlockingExporter(), schedule_delay=0.01)
processor.on_trace_start(get_trace(processor))

assert export_started.wait(timeout=2.0)

flush_thread = threading.Thread(target=processor.force_flush)
flush_thread.start()

time.sleep(0.1)
assert flush_thread.is_alive(), "force_flush() should wait for an in-flight export"

export_continue.set()
flush_thread.join(timeout=2.0)

assert not flush_thread.is_alive()

processor.shutdown()


def test_batch_trace_processor_shutdown_flushes(mocked_exporter):
processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=5.0)
processor.on_trace_start(get_trace(processor))
Expand Down Expand Up @@ -171,6 +202,100 @@ def test_batch_trace_processor_scheduled_export(mocked_exporter):
assert total_exported == 1, "Item should be exported after scheduled delay"


def test_flush_traces_delegates_to_default_trace_provider():
provider = DefaultTraceProvider()
mock_processor = MagicMock()
provider.register_processor(mock_processor)

with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider):
flush_traces()

mock_processor.force_flush.assert_called_once()


def test_flush_traces_is_importable_from_top_level_agents_package():
from agents import flush_traces as top_level_flush_traces

assert top_level_flush_traces is flush_traces


def test_default_trace_provider_force_flush_respects_disabled_flag():
provider = DefaultTraceProvider()
mock_processor = MagicMock()
provider.register_processor(mock_processor)

provider.set_disabled(True)
provider.force_flush()

mock_processor.force_flush.assert_not_called()


def test_trace_provider_force_flush_and_shutdown_default_to_noops():
class MinimalProvider(TraceProvider):
def register_processor(self, processor: TracingProcessor) -> None:
pass

def set_processors(self, processors: list[TracingProcessor]) -> None:
pass

def get_current_trace(self):
return None

def get_current_span(self):
return None

def set_disabled(self, disabled: bool) -> None:
pass

def time_iso(self) -> str:
return ""

def gen_trace_id(self) -> str:
return "trace_123"

def gen_span_id(self) -> str:
return "span_123"

def gen_group_id(self) -> str:
return "group_123"

def create_trace(
self,
name,
trace_id=None,
group_id=None,
metadata=None,
disabled=False,
tracing=None,
):
raise NotImplementedError

def create_span(self, span_data, span_id=None, parent=None, disabled=False):
raise NotImplementedError

provider = MinimalProvider()
provider.force_flush()
provider.shutdown()


def test_get_trace_provider_force_flush_flushes_default_processor(mocked_exporter):
provider = DefaultTraceProvider()
processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=60.0)
provider.register_processor(processor)

with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider):
processor.on_trace_start(get_trace(processor))
processor.on_span_end(get_span(processor))

get_trace_provider().force_flush()

total_exported = sum(
len(call_args[0][0]) for call_args in mocked_exporter.export.call_args_list
)
assert total_exported == 2
processor.shutdown()


@pytest.fixture
def patched_time_sleep():
"""
Expand Down
Loading