From e5ad308c22092ff199d5b97ec267988cafe9152b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Mon, 12 Jan 2026 08:47:10 +0100 Subject: [PATCH 1/2] feat(integrations): openai-agents streaming support --- sentry_sdk/integrations/mcp.py | 29 +++++++- .../integrations/openai_agents/__init__.py | 9 ++- .../openai_agents/patches/__init__.py | 2 +- .../openai_agents/patches/agent_run.py | 41 +++++++++++ .../openai_agents/patches/runner.py | 72 +++++++++++++++++++ .../openai_agents/test_openai_agents.py | 21 ++++++ 6 files changed, 169 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/mcp.py b/sentry_sdk/integrations/mcp.py index 47fda272b7..a8dc78f18e 100644 --- a/sentry_sdk/integrations/mcp.py +++ b/sentry_sdk/integrations/mcp.py @@ -7,6 +7,7 @@ Supports the low-level `mcp.server.lowlevel.Server` API. """ +from contextlib import contextmanager import inspect from functools import wraps from typing import TYPE_CHECKING @@ -352,6 +353,30 @@ def _prepare_handler_data( ) +@contextmanager +def ensure_span(*args, **kwargs): + """Ensure a span is created for the current context.""" + + current_span = sentry_sdk.get_current_span() + transaction_exists = ( + current_span is not None and current_span.containing_transaction is not None + ) + + if transaction_exists: + with sentry_sdk.start_span(*args, **kwargs) as span: + yield span + else: + with sentry_sdk.start_transaction(*args, **kwargs): + with sentry_sdk.start_span(*args, **kwargs) as span: + yield span + # with get_start_span_function()( + # op=OP.MCP_SERVER, + # name=span_name, + # origin=MCPIntegration.origin, + # ) as span: + # yield span + + async def _async_handler_wrapper( handler_type: str, func: "Callable[..., Any]", @@ -382,7 +407,7 @@ async def _async_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args, original_kwargs) # Start span and execute - with get_start_span_function()( + with ensure_span( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, @@ -454,7 +479,7 @@ def _sync_handler_wrapper( ) = _prepare_handler_data(handler_type, original_args) # Start span and execute - with get_start_span_function()( + with ensure_span( op=OP.MCP_SERVER, name=span_name, origin=MCPIntegration.origin, diff --git a/sentry_sdk/integrations/openai_agents/__init__.py b/sentry_sdk/integrations/openai_agents/__init__.py index b121557fbd..deb136de01 100644 --- a/sentry_sdk/integrations/openai_agents/__init__.py +++ b/sentry_sdk/integrations/openai_agents/__init__.py @@ -4,6 +4,7 @@ _create_get_model_wrapper, _create_get_all_tools_wrapper, _create_run_wrapper, + _create_run_streamed_wrapper, _patch_agent_run, _patch_error_tracing, ) @@ -25,12 +26,16 @@ def _patch_runner() -> None: # Create the root span for one full agent run (including eventual handoffs) # Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around # agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately. - # TODO-anton: Also patch streaming runner: agents.Runner.run_streamed agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper( agents.run.DEFAULT_AGENT_RUNNER.run ) - # Creating the actual spans for each agent run. + # Patch streaming runner + agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper( + agents.run.DEFAULT_AGENT_RUNNER.run_streamed + ) + + # Creating the actual spans for each agent run (works for both streaming and non-streaming). _patch_agent_run() diff --git a/sentry_sdk/integrations/openai_agents/patches/__init__.py b/sentry_sdk/integrations/openai_agents/patches/__init__.py index 33058f01a1..b53ca79e19 100644 --- a/sentry_sdk/integrations/openai_agents/patches/__init__.py +++ b/sentry_sdk/integrations/openai_agents/patches/__init__.py @@ -1,5 +1,5 @@ from .models import _create_get_model_wrapper # noqa: F401 from .tools import _create_get_all_tools_wrapper # noqa: F401 -from .runner import _create_run_wrapper # noqa: F401 +from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 from .agent_run import _patch_agent_run # noqa: F401 from .error_tracing import _patch_error_tracing # noqa: F401 diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 29649af945..5b29f2ddaa 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -31,6 +31,7 @@ def _patch_agent_run() -> None: # Store original methods original_run_single_turn = agents.run.AgentRunner._run_single_turn + original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs original_execute_final_output = agents._run_impl.RunImpl.execute_final_output @@ -149,8 +150,48 @@ async def patched_execute_final_output( return result + @wraps( + original_run_single_turn_streamed.__func__ + if hasattr(original_run_single_turn_streamed, "__func__") + else original_run_single_turn_streamed + ) + async def patched_run_single_turn_streamed( + cls: "agents.Runner", *args: "Any", **kwargs: "Any" + ) -> "Any": + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming""" + agent = kwargs.get("agent") + context_wrapper = kwargs.get("context_wrapper") + should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + + span = getattr(context_wrapper, "_sentry_agent_span", None) + # Start agent span when agent starts (but only once per agent) + if should_run_agent_start_hooks and agent and context_wrapper: + # End any existing span for a different agent + if _has_active_agent_span(context_wrapper): + current_agent = _get_current_agent(context_wrapper) + if current_agent and current_agent != agent: + end_invoke_agent_span(context_wrapper, current_agent) + + span = _start_invoke_agent_span(context_wrapper, agent, kwargs) + agent._sentry_agent_span = span + + # Call original streaming method + try: + result = await original_run_single_turn_streamed(*args, **kwargs) + except Exception as exc: + if span is not None and span.timestamp is None: + _record_exception_on_span(span, exc) + end_invoke_agent_span(context_wrapper, agent) + + reraise(*sys.exc_info()) + + return result + # Apply patches agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) + agents.run.AgentRunner._run_single_turn_streamed = classmethod( + patched_run_single_turn_streamed + ) agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) agents._run_impl.RunImpl.execute_final_output = classmethod( patched_execute_final_output diff --git a/sentry_sdk/integrations/openai_agents/patches/runner.py b/sentry_sdk/integrations/openai_agents/patches/runner.py index 1d3bbc894b..656f509c96 100644 --- a/sentry_sdk/integrations/openai_agents/patches/runner.py +++ b/sentry_sdk/integrations/openai_agents/patches/runner.py @@ -1,3 +1,4 @@ +import sys from functools import wraps import sentry_sdk @@ -64,3 +65,74 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any": return run_result return wrapper + + +def _create_run_streamed_wrapper( + original_func: "Callable[..., Any]", +) -> "Callable[..., Any]": + """ + Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs. + + Unlike run(), run_streamed() returns immediately with a RunResultStreaming object + while execution continues in a background task. The workflow span must stay open + throughout the streaming operation and close when streaming completes or is abandoned. + """ + + @wraps(original_func) + def wrapper(*args: "Any", **kwargs: "Any") -> "Any": + # Isolate each workflow so that when agents are run in asyncio tasks they + # don't touch each other's scopes + isolation_scope = sentry_sdk.isolation_scope() + isolation_scope.__enter__() + + # Clone agent because agent invocation spans are attached per run. + agent = args[0].clone() + + # Start workflow span immediately (before run_streamed returns) + workflow_span = agent_workflow_span(agent) + workflow_span.__enter__() + + # Store span and scope on agent for cleanup + agent._sentry_workflow_span = workflow_span + agent._sentry_isolation_scope = isolation_scope + + args = (agent, *args[1:]) + + try: + # Call original function to get RunResultStreaming + run_result = original_func(*args, **kwargs) + except Exception as exc: + # If run_streamed itself fails (not the background task), clean up immediately + workflow_span.__exit__(*sys.exc_info()) + isolation_scope.__exit__(None, None, None) + _capture_exception(exc) + raise exc from None + + # Wrap the result to ensure cleanup when streaming completes + original_aclose = getattr(run_result, "aclose", None) + + async def wrapped_aclose() -> None: + """Close streaming result and clean up Sentry spans""" + try: + if original_aclose is not None: + await original_aclose() + finally: + # End any remaining agent span + if hasattr(run_result, "context_wrapper"): + end_invoke_agent_span(run_result.context_wrapper, agent) + + # End workflow span + if hasattr(agent, "_sentry_workflow_span"): + workflow_span.__exit__(None, None, None) + delattr(agent, "_sentry_workflow_span") + + # Exit isolation scope + if hasattr(agent, "_sentry_isolation_scope"): + isolation_scope.__exit__(None, None, None) + delattr(agent, "_sentry_isolation_scope") + + run_result.aclose = wrapped_aclose + + return run_result + + return wrapper diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index c5cb25dfee..85e71b3da1 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -1998,3 +1998,24 @@ def test_openai_agents_message_truncation(sentry_init, capture_events): assert len(parsed_messages) == 2 assert "small message 4" in str(parsed_messages[0]) assert "small message 5" in str(parsed_messages[1]) + + +def test_streaming_patches_applied(sentry_init): + """ + Test that the streaming patches are applied correctly. + """ + sentry_init( + integrations=[OpenAIAgentsIntegration()], + traces_sample_rate=1.0, + ) + + # Verify that run_streamed is patched (will have __wrapped__ attribute if patched) + import agents + + # Check that the method exists and has been modified + assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed") + assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed") + + # Verify the patches were applied by checking for our wrapper + run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed + assert run_streamed_func is not None From 8884375874931937a090c5cfa64b80801198e88b Mon Sep 17 00:00:00 2001 From: Fabian Schindler Date: Mon, 12 Jan 2026 13:18:04 +0100 Subject: [PATCH 2/2] fix: improved instrumentation --- .../openai_agents/patches/agent_run.py | 43 ++++++++++++++-- .../openai_agents/patches/models.py | 51 +++++++++++++++++++ .../openai_agents/patches/runner.py | 40 +++------------ 3 files changed, 97 insertions(+), 37 deletions(-) diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 5b29f2ddaa..0241ac784f 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -1,6 +1,7 @@ import sys from functools import wraps +from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.utils import reraise from ..spans import ( @@ -148,6 +149,13 @@ async def patched_execute_final_output( if agent and context_wrapper and _has_active_agent_span(context_wrapper): end_invoke_agent_span(context_wrapper, agent, final_output) + # For streaming: close the workflow span if it exists + # (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper) + if agent and hasattr(agent, "_sentry_workflow_span"): + workflow_span = agent._sentry_workflow_span + workflow_span.__exit__(None, None, None) + delattr(agent, "_sentry_workflow_span") + return result @wraps( @@ -158,10 +166,28 @@ async def patched_execute_final_output( async def patched_run_single_turn_streamed( cls: "agents.Runner", *args: "Any", **kwargs: "Any" ) -> "Any": - """Patched _run_single_turn_streamed that creates agent invocation spans for streaming""" - agent = kwargs.get("agent") - context_wrapper = kwargs.get("context_wrapper") - should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. + + Note: Unlike _run_single_turn which uses keyword-only arguments (*,), + _run_single_turn_streamed uses positional arguments. The call signature is: + _run_single_turn_streamed( + streamed_result, # args[0] + agent, # args[1] + hooks, # args[2] + context_wrapper, # args[3] + run_config, # args[4] + should_run_agent_start_hooks, # args[5] + tool_use_tracker, # args[6] + all_tools, # args[7] + server_conversation_tracker, # args[8] (optional) + ) + """ + # Extract positional arguments (streaming version doesn't use keyword-only args) + agent = args[1] if len(args) > 1 else kwargs.get("agent") + context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") + should_run_agent_start_hooks = ( + args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks") + ) span = getattr(context_wrapper, "_sentry_agent_span", None) # Start agent span when agent starts (but only once per agent) @@ -172,7 +198,14 @@ async def patched_run_single_turn_streamed( if current_agent and current_agent != agent: end_invoke_agent_span(context_wrapper, current_agent) - span = _start_invoke_agent_span(context_wrapper, agent, kwargs) + # Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span) + span_kwargs = { + "agent": agent, + "context_wrapper": context_wrapper, + "should_run_agent_start_hooks": should_run_agent_start_hooks, + } + span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs) + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) agent._sentry_agent_span = span # Call original streaming method diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index a9b3c16a22..8099cfef89 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -73,6 +73,57 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": model.get_response = wrapped_get_response + # Also wrap stream_response for streaming support + if hasattr(model, "stream_response"): + original_stream_response = model.stream_response + + @wraps(original_stream_response) + async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": + """ + Wrap stream_response to create an AI client span for streaming. + stream_response is an async generator, so we yield events within the span. + + Note: stream_response is called with positional args unlike get_response + which uses keyword args. The signature is: + stream_response( + system_instructions, # args[0] + input, # args[1] + model_settings, # args[2] + tools, # args[3] + output_schema, # args[4] + handoffs, # args[5] + tracing, # args[6] + *, + previous_response_id, + conversation_id, + prompt, + ) + """ + # Build kwargs dict from positional args for span data capture + span_kwargs = dict(kwargs) + if len(args) > 0: + span_kwargs["system_instructions"] = args[0] + if len(args) > 1: + span_kwargs["input"] = args[1] + + with ai_client_span(agent, span_kwargs) as span: + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + async for event in original_stream_response(*args, **kwargs): + yield event + + # Get response model if captured + response_model = getattr(agent, "_sentry_raw_response_model", None) + if response_model: + agent_span = getattr(agent, "_sentry_agent_span", None) + if agent_span: + agent_span.set_data( + SPANDATA.GEN_AI_RESPONSE_MODEL, response_model + ) + delattr(agent, "_sentry_raw_response_model") + + model.stream_response = wrapped_stream_response + return model return wrapped_get_model diff --git a/sentry_sdk/integrations/openai_agents/patches/runner.py b/sentry_sdk/integrations/openai_agents/patches/runner.py index 656f509c96..7fa8d1c2a2 100644 --- a/sentry_sdk/integrations/openai_agents/patches/runner.py +++ b/sentry_sdk/integrations/openai_agents/patches/runner.py @@ -76,15 +76,14 @@ def _create_run_streamed_wrapper( Unlike run(), run_streamed() returns immediately with a RunResultStreaming object while execution continues in a background task. The workflow span must stay open throughout the streaming operation and close when streaming completes or is abandoned. + + Note: We don't use isolation_scope() here because it uses context variables that + cannot span async boundaries (the __enter__ and __exit__ would be called from + different async contexts, causing ValueError). """ @wraps(original_func) def wrapper(*args: "Any", **kwargs: "Any") -> "Any": - # Isolate each workflow so that when agents are run in asyncio tasks they - # don't touch each other's scopes - isolation_scope = sentry_sdk.isolation_scope() - isolation_scope.__enter__() - # Clone agent because agent invocation spans are attached per run. agent = args[0].clone() @@ -92,9 +91,8 @@ def wrapper(*args: "Any", **kwargs: "Any") -> "Any": workflow_span = agent_workflow_span(agent) workflow_span.__enter__() - # Store span and scope on agent for cleanup + # Store span on agent for cleanup agent._sentry_workflow_span = workflow_span - agent._sentry_isolation_scope = isolation_scope args = (agent, *args[1:]) @@ -104,34 +102,12 @@ def wrapper(*args: "Any", **kwargs: "Any") -> "Any": except Exception as exc: # If run_streamed itself fails (not the background task), clean up immediately workflow_span.__exit__(*sys.exc_info()) - isolation_scope.__exit__(None, None, None) _capture_exception(exc) raise exc from None - # Wrap the result to ensure cleanup when streaming completes - original_aclose = getattr(run_result, "aclose", None) - - async def wrapped_aclose() -> None: - """Close streaming result and clean up Sentry spans""" - try: - if original_aclose is not None: - await original_aclose() - finally: - # End any remaining agent span - if hasattr(run_result, "context_wrapper"): - end_invoke_agent_span(run_result.context_wrapper, agent) - - # End workflow span - if hasattr(agent, "_sentry_workflow_span"): - workflow_span.__exit__(None, None, None) - delattr(agent, "_sentry_workflow_span") - - # Exit isolation scope - if hasattr(agent, "_sentry_isolation_scope"): - isolation_scope.__exit__(None, None, None) - delattr(agent, "_sentry_isolation_scope") - - run_result.aclose = wrapped_aclose + # Store references for cleanup + run_result._sentry_workflow_span = workflow_span + run_result._sentry_agent = agent return run_result