From 7ea9aa01efd680ea7da89ae3539e80b6f1947395 Mon Sep 17 00:00:00 2001 From: Kazuhiro Sera Date: Sat, 4 Apr 2026 12:12:34 +0900 Subject: [PATCH] feat: #2135 add public flush_traces API --- src/agents/__init__.py | 2 + src/agents/tracing/__init__.py | 12 +++ src/agents/tracing/processors.py | 34 ++++---- src/agents/tracing/provider.py | 28 ++++++- tests/test_trace_processor.py | 131 ++++++++++++++++++++++++++++++- 5 files changed, 186 insertions(+), 21 deletions(-) diff --git a/src/agents/__init__.py b/src/agents/__init__.py index 214e814d3e..54c739bbda 100644 --- a/src/agents/__init__.py +++ b/src/agents/__init__.py @@ -203,6 +203,7 @@ add_trace_processor, agent_span, custom_span, + flush_traces, function_span, gen_span_id, gen_trace_id, @@ -451,6 +452,7 @@ def enable_verbose_stdout_logging(): "add_trace_processor", "agent_span", "custom_span", + "flush_traces", "function_span", "generation_span", "get_current_span", diff --git a/src/agents/tracing/__init__.py b/src/agents/tracing/__init__.py index 9f5e4f7568..76a77fe0ab 100644 --- a/src/agents/tracing/__init__.py +++ b/src/agents/tracing/__init__.py @@ -42,6 +42,7 @@ "add_trace_processor", "agent_span", "custom_span", + "flush_traces", "function_span", "generation_span", "get_current_span", @@ -108,3 +109,14 @@ def set_tracing_export_api_key(api_key: str) -> None: Set the OpenAI API key for the backend exporter. """ default_exporter().set_api_key(api_key) + + +def flush_traces() -> None: + """Force immediate export of buffered traces and spans. + + The default ``BatchTraceProcessor`` already exports traces periodically in the + background. Call this when a worker, background job, or request handler needs + traces to be visible immediately after a unit of work finishes instead of + waiting for the next scheduled flush. + """ + get_trace_provider().force_flush() diff --git a/src/agents/tracing/processors.py b/src/agents/tracing/processors.py index 7132faf1c8..1f8cfa9d4e 100644 --- a/src/agents/tracing/processors.py +++ b/src/agents/tracing/processors.py @@ -491,6 +491,7 @@ def __init__( # We lazily start the background worker thread the first time a span/trace is queued. self._worker_thread: threading.Thread | None = None self._thread_start_lock = threading.Lock() + self._export_lock = threading.Lock() def _ensure_thread_started(self) -> None: # Fast path without holding the lock @@ -571,25 +572,26 @@ def _export_batches(self, force: bool = False): """Drains the queue and exports in batches. If force=True, export everything. Otherwise, export up to `max_batch_size` repeatedly until the queue is completely empty. """ - while True: - items_to_export: list[Span[Any] | Trace] = [] + with self._export_lock: + while True: + items_to_export: list[Span[Any] | Trace] = [] + + # Gather a batch of spans up to max_batch_size + while not self._queue.empty() and ( + force or len(items_to_export) < self._max_batch_size + ): + try: + items_to_export.append(self._queue.get_nowait()) + except queue.Empty: + # Another thread might have emptied the queue between checks + break - # Gather a batch of spans up to max_batch_size - while not self._queue.empty() and ( - force or len(items_to_export) < self._max_batch_size - ): - try: - items_to_export.append(self._queue.get_nowait()) - except queue.Empty: - # Another thread might have emptied the queue between checks + # If we collected nothing, we're done + if not items_to_export: break - # If we collected nothing, we're done - if not items_to_export: - break - - # Export the batch - self._exporter.export(items_to_export) + # Export the batch + self._exporter.export(items_to_export) # Lazily initialized defaults to avoid creating network clients or threading diff --git a/src/agents/tracing/provider.py b/src/agents/tracing/provider.py index 90ea85cbf0..e37841ddf2 100644 --- a/src/agents/tracing/provider.py +++ b/src/agents/tracing/provider.py @@ -188,9 +188,21 @@ def create_span( ) -> Span[TSpanData]: """Create a new span.""" - @abstractmethod + def force_flush(self) -> None: + """Force all registered processors to flush buffered traces/spans immediately. + + The default implementation is a no-op so existing custom ``TraceProvider`` + implementations continue to work without adding this method. + """ + return None + def shutdown(self) -> None: - """Clean up any resources used by the provider.""" + """Clean up any resources used by the provider. + + The default implementation is a no-op so existing custom ``TraceProvider`` + implementations continue to work without adding this method. + """ + return None class DefaultTraceProvider(TraceProvider): @@ -365,7 +377,19 @@ def create_span( trace_metadata=trace_metadata, ) + def force_flush(self) -> None: + """Force all processors to flush their buffers immediately.""" + self._refresh_disabled_flag() + if self._disabled: + return + + try: + self._multi_processor.force_flush() + except Exception as e: + logger.error(f"Error flushing trace provider: {e}") + def shutdown(self) -> None: + self._refresh_disabled_flag() if self._disabled: return diff --git a/tests/test_trace_processor.py b/tests/test_trace_processor.py index ad061d7995..1c917990d5 100644 --- a/tests/test_trace_processor.py +++ b/tests/test_trace_processor.py @@ -1,4 +1,5 @@ import os +import threading import time from typing import Any, cast from unittest.mock import MagicMock, patch @@ -6,11 +7,13 @@ import httpx import pytest -from agents.tracing.processor_interface import TracingProcessor +from agents.tracing import flush_traces, get_trace_provider +from agents.tracing.processor_interface import TracingExporter, TracingProcessor from agents.tracing.processors import BackendSpanExporter, BatchTraceProcessor +from agents.tracing.provider import DefaultTraceProvider, TraceProvider from agents.tracing.span_data import AgentSpanData -from agents.tracing.spans import SpanImpl -from agents.tracing.traces import TraceImpl +from agents.tracing.spans import Span, SpanImpl +from agents.tracing.traces import Trace, TraceImpl def get_span(processor: TracingProcessor) -> SpanImpl[AgentSpanData]: @@ -123,6 +126,34 @@ def test_batch_trace_processor_force_flush(mocked_exporter): processor.shutdown() +def test_batch_trace_processor_force_flush_waits_for_in_flight_background_export(): + export_started = threading.Event() + export_continue = threading.Event() + + class BlockingExporter(TracingExporter): + def export(self, items: list[Trace | Span[Any]]) -> None: + export_started.set() + assert export_continue.wait(timeout=2.0) + + processor = BatchTraceProcessor(exporter=BlockingExporter(), schedule_delay=0.01) + processor.on_trace_start(get_trace(processor)) + + assert export_started.wait(timeout=2.0) + + flush_thread = threading.Thread(target=processor.force_flush) + flush_thread.start() + + time.sleep(0.1) + assert flush_thread.is_alive(), "force_flush() should wait for an in-flight export" + + export_continue.set() + flush_thread.join(timeout=2.0) + + assert not flush_thread.is_alive() + + processor.shutdown() + + def test_batch_trace_processor_shutdown_flushes(mocked_exporter): processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=5.0) processor.on_trace_start(get_trace(processor)) @@ -171,6 +202,100 @@ def test_batch_trace_processor_scheduled_export(mocked_exporter): assert total_exported == 1, "Item should be exported after scheduled delay" +def test_flush_traces_delegates_to_default_trace_provider(): + provider = DefaultTraceProvider() + mock_processor = MagicMock() + provider.register_processor(mock_processor) + + with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider): + flush_traces() + + mock_processor.force_flush.assert_called_once() + + +def test_flush_traces_is_importable_from_top_level_agents_package(): + from agents import flush_traces as top_level_flush_traces + + assert top_level_flush_traces is flush_traces + + +def test_default_trace_provider_force_flush_respects_disabled_flag(): + provider = DefaultTraceProvider() + mock_processor = MagicMock() + provider.register_processor(mock_processor) + + provider.set_disabled(True) + provider.force_flush() + + mock_processor.force_flush.assert_not_called() + + +def test_trace_provider_force_flush_and_shutdown_default_to_noops(): + class MinimalProvider(TraceProvider): + def register_processor(self, processor: TracingProcessor) -> None: + pass + + def set_processors(self, processors: list[TracingProcessor]) -> None: + pass + + def get_current_trace(self): + return None + + def get_current_span(self): + return None + + def set_disabled(self, disabled: bool) -> None: + pass + + def time_iso(self) -> str: + return "" + + def gen_trace_id(self) -> str: + return "trace_123" + + def gen_span_id(self) -> str: + return "span_123" + + def gen_group_id(self) -> str: + return "group_123" + + def create_trace( + self, + name, + trace_id=None, + group_id=None, + metadata=None, + disabled=False, + tracing=None, + ): + raise NotImplementedError + + def create_span(self, span_data, span_id=None, parent=None, disabled=False): + raise NotImplementedError + + provider = MinimalProvider() + provider.force_flush() + provider.shutdown() + + +def test_get_trace_provider_force_flush_flushes_default_processor(mocked_exporter): + provider = DefaultTraceProvider() + processor = BatchTraceProcessor(exporter=mocked_exporter, schedule_delay=60.0) + provider.register_processor(processor) + + with patch("agents.tracing.setup.GLOBAL_TRACE_PROVIDER", provider): + processor.on_trace_start(get_trace(processor)) + processor.on_span_end(get_span(processor)) + + get_trace_provider().force_flush() + + total_exported = sum( + len(call_args[0][0]) for call_args in mocked_exporter.export.call_args_list + ) + assert total_exported == 2 + processor.shutdown() + + @pytest.fixture def patched_time_sleep(): """