diff --git a/sentry_sdk/integrations/openai_agents/__init__.py b/sentry_sdk/integrations/openai_agents/__init__.py index b121557fbd..deb136de01 100644 --- a/sentry_sdk/integrations/openai_agents/__init__.py +++ b/sentry_sdk/integrations/openai_agents/__init__.py @@ -4,6 +4,7 @@ _create_get_model_wrapper, _create_get_all_tools_wrapper, _create_run_wrapper, + _create_run_streamed_wrapper, _patch_agent_run, _patch_error_tracing, ) @@ -25,12 +26,16 @@ def _patch_runner() -> None: # Create the root span for one full agent run (including eventual handoffs) # Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around # agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately. - # TODO-anton: Also patch streaming runner: agents.Runner.run_streamed agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper( agents.run.DEFAULT_AGENT_RUNNER.run ) - # Creating the actual spans for each agent run. + # Patch streaming runner + agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper( + agents.run.DEFAULT_AGENT_RUNNER.run_streamed + ) + + # Creating the actual spans for each agent run (works for both streaming and non-streaming). _patch_agent_run() diff --git a/sentry_sdk/integrations/openai_agents/patches/__init__.py b/sentry_sdk/integrations/openai_agents/patches/__init__.py index 33058f01a1..b53ca79e19 100644 --- a/sentry_sdk/integrations/openai_agents/patches/__init__.py +++ b/sentry_sdk/integrations/openai_agents/patches/__init__.py @@ -1,5 +1,5 @@ from .models import _create_get_model_wrapper # noqa: F401 from .tools import _create_get_all_tools_wrapper # noqa: F401 -from .runner import _create_run_wrapper # noqa: F401 +from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401 from .agent_run import _patch_agent_run # noqa: F401 from .error_tracing import _patch_error_tracing # noqa: F401 diff --git a/sentry_sdk/integrations/openai_agents/patches/agent_run.py b/sentry_sdk/integrations/openai_agents/patches/agent_run.py index 29649af945..53fe2ccdc2 100644 --- a/sentry_sdk/integrations/openai_agents/patches/agent_run.py +++ b/sentry_sdk/integrations/openai_agents/patches/agent_run.py @@ -1,6 +1,7 @@ import sys from functools import wraps +from sentry_sdk.consts import SPANDATA from sentry_sdk.integrations import DidNotEnable from sentry_sdk.utils import reraise from ..spans import ( @@ -31,22 +32,10 @@ def _patch_agent_run() -> None: # Store original methods original_run_single_turn = agents.run.AgentRunner._run_single_turn + original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs original_execute_final_output = agents._run_impl.RunImpl.execute_final_output - def _start_invoke_agent_span( - context_wrapper: "agents.RunContextWrapper", - agent: "agents.Agent", - kwargs: "dict[str, Any]", - ) -> "Span": - """Start an agent invocation span""" - # Store the agent on the context wrapper so we can access it later - context_wrapper._sentry_current_agent = agent - span = invoke_agent_span(context_wrapper, agent, kwargs) - context_wrapper._sentry_agent_span = span - - return span - def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool: """Check if there's an active agent span for this context""" return getattr(context_wrapper, "_sentry_current_agent", None) is not None @@ -57,6 +46,46 @@ def _get_current_agent( """Get the current agent from context wrapper""" return getattr(context_wrapper, "_sentry_current_agent", None) + def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None: + """Close the workflow span for streaming executions if it exists.""" + if agent and hasattr(agent, "_sentry_workflow_span"): + workflow_span = agent._sentry_workflow_span + workflow_span.__exit__(*sys.exc_info()) + delattr(agent, "_sentry_workflow_span") + + def _maybe_start_agent_span( + context_wrapper: "agents.RunContextWrapper", + agent: "agents.Agent", + should_run_agent_start_hooks: bool, + span_kwargs: "dict[str, Any]", + is_streaming: bool = False, + ) -> "Optional[Span]": + """ + Start an agent invocation span if conditions are met. + Handles ending any existing span for a different agent. + + Returns the new span if started, or the existing span if conditions aren't met. + """ + if not (should_run_agent_start_hooks and agent and context_wrapper): + return getattr(context_wrapper, "_sentry_agent_span", None) + + # End any existing span for a different agent + if _has_active_agent_span(context_wrapper): + current_agent = _get_current_agent(context_wrapper) + if current_agent and current_agent != agent: + end_invoke_agent_span(context_wrapper, current_agent) + + # Store the agent on the context wrapper so we can access it later + context_wrapper._sentry_current_agent = agent + span = invoke_agent_span(context_wrapper, agent, span_kwargs) + context_wrapper._sentry_agent_span = span + agent._sentry_agent_span = span + + if is_streaming: + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + return span + @wraps( original_run_single_turn.__func__ if hasattr(original_run_single_turn, "__func__") @@ -68,28 +97,18 @@ async def patched_run_single_turn( """Patched _run_single_turn that creates agent invocation spans""" agent = kwargs.get("agent") context_wrapper = kwargs.get("context_wrapper") - should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks") + should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False) - span = getattr(context_wrapper, "_sentry_agent_span", None) - # Start agent span when agent starts (but only once per agent) - if should_run_agent_start_hooks and agent and context_wrapper: - # End any existing span for a different agent - if _has_active_agent_span(context_wrapper): - current_agent = _get_current_agent(context_wrapper) - if current_agent and current_agent != agent: - end_invoke_agent_span(context_wrapper, current_agent) + span = _maybe_start_agent_span( + context_wrapper, agent, should_run_agent_start_hooks, kwargs + ) - span = _start_invoke_agent_span(context_wrapper, agent, kwargs) - agent._sentry_agent_span = span - - # Call original method with all the correct parameters try: result = await original_run_single_turn(*args, **kwargs) except Exception as exc: if span is not None and span.timestamp is None: _record_exception_on_span(span, exc) end_invoke_agent_span(context_wrapper, agent) - reraise(*sys.exc_info()) return result @@ -117,7 +136,9 @@ async def patched_execute_handoffs( # Call original method with all parameters try: result = await original_execute_handoffs(*args, **kwargs) - + except Exception: + _close_streaming_workflow_span(agent) + raise finally: # End span for current agent after handoff processing is complete if agent and context_wrapper and _has_active_agent_span(context_wrapper): @@ -139,18 +160,77 @@ async def patched_execute_final_output( context_wrapper = kwargs.get("context_wrapper") final_output = kwargs.get("final_output") - # Call original method with all parameters try: result = await original_execute_final_output(*args, **kwargs) finally: - # End span for current agent after final output processing is complete if agent and context_wrapper and _has_active_agent_span(context_wrapper): end_invoke_agent_span(context_wrapper, agent, final_output) + # For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper) + _close_streaming_workflow_span(agent) + + return result + + @wraps( + original_run_single_turn_streamed.__func__ + if hasattr(original_run_single_turn_streamed, "__func__") + else original_run_single_turn_streamed + ) + async def patched_run_single_turn_streamed( + cls: "agents.Runner", *args: "Any", **kwargs: "Any" + ) -> "Any": + """Patched _run_single_turn_streamed that creates agent invocation spans for streaming. + + Note: Unlike _run_single_turn which uses keyword-only arguments (*,), + _run_single_turn_streamed uses positional arguments. The call signature is: + _run_single_turn_streamed( + streamed_result, # args[0] + agent, # args[1] + hooks, # args[2] + context_wrapper, # args[3] + run_config, # args[4] + should_run_agent_start_hooks, # args[5] + tool_use_tracker, # args[6] + all_tools, # args[7] + server_conversation_tracker, # args[8] (optional) + ) + """ + streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result") + agent = args[1] if len(args) > 1 else kwargs.get("agent") + context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper") + should_run_agent_start_hooks = bool( + args[5] + if len(args) > 5 + else kwargs.get("should_run_agent_start_hooks", False) + ) + + span_kwargs: "dict[str, Any]" = {} + if streamed_result and hasattr(streamed_result, "input"): + span_kwargs["original_input"] = streamed_result.input + + span = _maybe_start_agent_span( + context_wrapper, + agent, + should_run_agent_start_hooks, + span_kwargs, + is_streaming=True, + ) + + try: + result = await original_run_single_turn_streamed(*args, **kwargs) + except Exception as exc: + if span is not None and span.timestamp is None: + _record_exception_on_span(span, exc) + end_invoke_agent_span(context_wrapper, agent) + _close_streaming_workflow_span(agent) + reraise(*sys.exc_info()) return result # Apply patches agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn) + agents.run.AgentRunner._run_single_turn_streamed = classmethod( + patched_run_single_turn_streamed + ) agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs) agents._run_impl.RunImpl.execute_final_output = classmethod( patched_execute_final_output diff --git a/sentry_sdk/integrations/openai_agents/patches/models.py b/sentry_sdk/integrations/openai_agents/patches/models.py index a9b3c16a22..b8259a7724 100644 --- a/sentry_sdk/integrations/openai_agents/patches/models.py +++ b/sentry_sdk/integrations/openai_agents/patches/models.py @@ -1,4 +1,5 @@ import copy +import sys from functools import wraps from sentry_sdk.integrations import DidNotEnable @@ -9,8 +10,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any, Callable - + from typing import Any, Callable, Optional try: import agents @@ -18,6 +18,16 @@ raise DidNotEnable("OpenAI Agents not installed") +def _set_response_model_on_agent_span( + agent: "agents.Agent", response_model: "Optional[str]" +) -> None: + """Set the response model on the agent's invoke_agent span if available.""" + if response_model: + agent_span = getattr(agent, "_sentry_agent_span", None) + if agent_span: + agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + + def _create_get_model_wrapper( original_get_model: "Callable[..., Any]", ) -> "Callable[..., Any]": @@ -37,15 +47,19 @@ def wrapped_get_model( # because we only patch its direct methods, all underlying data can remain unchanged. model = copy.copy(original_get_model(agent, run_config)) - # Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model + # Capture the request model name for spans (agent.model can be None when using defaults) + request_model_name = model.model if hasattr(model, "model") else str(model) + agent._sentry_request_model = request_model_name + + # Wrap _fetch_response if it exists (for OpenAI models) to capture response model if hasattr(model, "_fetch_response"): original_fetch_response = model._fetch_response @wraps(original_fetch_response) async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any": response = await original_fetch_response(*args, **kwargs) - if hasattr(response, "model"): - agent._sentry_raw_response_model = str(response.model) + if hasattr(response, "model") and response.model: + agent._sentry_response_model = str(response.model) return response model._fetch_response = wrapped_fetch_response @@ -57,22 +71,59 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any": with ai_client_span(agent, kwargs) as span: result = await original_get_response(*args, **kwargs) - response_model = getattr(agent, "_sentry_raw_response_model", None) + # Get response model captured from _fetch_response and clean up + response_model = getattr(agent, "_sentry_response_model", None) if response_model: - agent_span = getattr(agent, "_sentry_agent_span", None) - if agent_span: - agent_span.set_data( - SPANDATA.GEN_AI_RESPONSE_MODEL, response_model - ) + delattr(agent, "_sentry_response_model") - delattr(agent, "_sentry_raw_response_model") - - update_ai_client_span(span, agent, kwargs, result, response_model) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, result, response_model) return result model.get_response = wrapped_get_response + # Also wrap stream_response for streaming support + if hasattr(model, "stream_response"): + original_stream_response = model.stream_response + + @wraps(original_stream_response) + async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any": + # Uses explicit try/finally instead of context manager to ensure cleanup + # even if the consumer abandons the stream (GeneratorExit). + span_kwargs = dict(kwargs) + if len(args) > 0: + span_kwargs["system_instructions"] = args[0] + if len(args) > 1: + span_kwargs["input"] = args[1] + + span = ai_client_span(agent, span_kwargs) + span.__enter__() + span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True) + + streaming_response = None + try: + async for event in original_stream_response(*args, **kwargs): + # Capture the full response from ResponseCompletedEvent + if hasattr(event, "response"): + streaming_response = event.response + yield event + + # Update span with response data (usage, output, model) + if streaming_response: + response_model = ( + str(streaming_response.model) + if hasattr(streaming_response, "model") + and streaming_response.model + else None + ) + _set_response_model_on_agent_span(agent, response_model) + update_ai_client_span(span, streaming_response) + finally: + span.__exit__(*sys.exc_info()) + + model.stream_response = wrapped_stream_response + return model return wrapped_get_model diff --git a/sentry_sdk/integrations/openai_agents/patches/runner.py b/sentry_sdk/integrations/openai_agents/patches/runner.py index 1d3bbc894b..7fa8d1c2a2 100644 --- a/sentry_sdk/integrations/openai_agents/patches/runner.py +++ b/sentry_sdk/integrations/openai_agents/patches/runner.py @@ -1,3 +1,4 @@ +import sys from functools import wraps import sentry_sdk @@ -64,3 +65,50 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any": return run_result return wrapper + + +def _create_run_streamed_wrapper( + original_func: "Callable[..., Any]", +) -> "Callable[..., Any]": + """ + Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs. + + Unlike run(), run_streamed() returns immediately with a RunResultStreaming object + while execution continues in a background task. The workflow span must stay open + throughout the streaming operation and close when streaming completes or is abandoned. + + Note: We don't use isolation_scope() here because it uses context variables that + cannot span async boundaries (the __enter__ and __exit__ would be called from + different async contexts, causing ValueError). + """ + + @wraps(original_func) + def wrapper(*args: "Any", **kwargs: "Any") -> "Any": + # Clone agent because agent invocation spans are attached per run. + agent = args[0].clone() + + # Start workflow span immediately (before run_streamed returns) + workflow_span = agent_workflow_span(agent) + workflow_span.__enter__() + + # Store span on agent for cleanup + agent._sentry_workflow_span = workflow_span + + args = (agent, *args[1:]) + + try: + # Call original function to get RunResultStreaming + run_result = original_func(*args, **kwargs) + except Exception as exc: + # If run_streamed itself fails (not the background task), clean up immediately + workflow_span.__exit__(*sys.exc_info()) + _capture_exception(exc) + raise exc from None + + # Store references for cleanup + run_result._sentry_workflow_span = workflow_span + run_result._sentry_agent = agent + + return run_result + + return wrapper diff --git a/sentry_sdk/integrations/openai_agents/spans/ai_client.py b/sentry_sdk/integrations/openai_agents/spans/ai_client.py index 1e188aa097..c099f133f4 100644 --- a/sentry_sdk/integrations/openai_agents/spans/ai_client.py +++ b/sentry_sdk/integrations/openai_agents/spans/ai_client.py @@ -21,7 +21,13 @@ def ai_client_span( agent: "Agent", get_response_kwargs: "dict[str, Any]" ) -> "sentry_sdk.tracing.Span": # TODO-anton: implement other types of operations. Now "chat" is hardcoded. - model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + # Get model name from agent.model or fall back to request model (for when agent.model is None/default) + model_name = None + if agent.model: + model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + elif hasattr(agent, "_sentry_request_model"): + model_name = agent._sentry_request_model + span = sentry_sdk.start_span( op=OP.GEN_AI_CHAT, description=f"chat {model_name}", @@ -38,15 +44,18 @@ def ai_client_span( def update_ai_client_span( span: "sentry_sdk.tracing.Span", - agent: "Agent", - get_response_kwargs: "dict[str, Any]", - result: "Any", + response: "Any", response_model: "Optional[str]" = None, ) -> None: - _set_usage_data(span, result.usage) - _set_output_data(span, result) - _create_mcp_execute_tool_spans(span, result) + """Update AI client span with response data (works for streaming and non-streaming).""" + if hasattr(response, "usage") and response.usage: + _set_usage_data(span, response.usage) + + if hasattr(response, "output") and response.output: + _set_output_data(span, response) + _create_mcp_execute_tool_spans(span, response) - # Set response model if captured from raw response if response_model is not None: span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model) + elif hasattr(response, "model") and response.model: + span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, str(response.model)) diff --git a/sentry_sdk/integrations/openai_agents/utils.py b/sentry_sdk/integrations/openai_agents/utils.py index a24d0e909d..a77d9f46ea 100644 --- a/sentry_sdk/integrations/openai_agents/utils.py +++ b/sentry_sdk/integrations/openai_agents/utils.py @@ -63,8 +63,14 @@ def _set_agent_data(span: "sentry_sdk.tracing.Span", agent: "agents.Agent") -> N SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, agent.model_settings.max_tokens ) + # Get model name from agent.model or fall back to request model (for when agent.model is None/default) + model_name = None if agent.model: model_name = agent.model.model if hasattr(agent.model, "model") else agent.model + elif hasattr(agent, "_sentry_request_model"): + model_name = agent._sentry_request_model + + if model_name: span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model_name) if agent.model_settings.presence_penalty: diff --git a/tests/integrations/openai_agents/test_openai_agents.py b/tests/integrations/openai_agents/test_openai_agents.py index c5cb25dfee..89798775a1 100644 --- a/tests/integrations/openai_agents/test_openai_agents.py +++ b/tests/integrations/openai_agents/test_openai_agents.py @@ -1465,7 +1465,7 @@ async def test_ai_client_span_includes_response_model( ): """ Test that ai_client spans (gen_ai.chat) include the response model from the actual API response. - This verifies the new functionality to capture the model used in the response. + This verifies we capture the actual model used (which may differ from the requested model). """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): @@ -1473,7 +1473,7 @@ async def test_ai_client_span_includes_response_model( with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock OpenAI Response object with a model field + # Create a mock OpenAI Response object with a specific model version mock_response = MagicMock() mock_response.model = "gpt-4.1-2025-04-14" # The actual response model mock_response.id = "resp_123" @@ -1523,7 +1523,7 @@ async def test_ai_client_span_includes_response_model( spans = transaction["spans"] _, ai_client_span = spans - # Verify ai_client span has response model + # Verify ai_client span has response model from API response assert ai_client_span["description"] == "chat gpt-4" assert "gen_ai.response.model" in ai_client_span["data"] assert ai_client_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -1545,13 +1545,13 @@ async def test_ai_client_span_response_model_with_chat_completions( ) with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): - # Mock the get_response method directly since ChatCompletions may use Responses API anyway + # Mock the _fetch_response method with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock Response object with a model field + # Create a mock Response object mock_response = MagicMock() - mock_response.model = "gpt-4o-mini-2024-07-18" # Actual response model + mock_response.model = "gpt-4o-mini-2024-07-18" mock_response.id = "resp_123" mock_response.output = [ ResponseOutputMessage( @@ -1598,7 +1598,7 @@ async def test_ai_client_span_response_model_with_chat_completions( spans = transaction["spans"] _, ai_client_span = spans - # Verify response model from Response is captured + # Verify response model from API response is captured assert "gen_ai.response.model" in ai_client_span["data"] assert ai_client_span["data"]["gen_ai.response.model"] == "gpt-4o-mini-2024-07-18" @@ -1711,41 +1711,45 @@ async def test_response_model_not_set_when_unavailable( sentry_init, capture_events, test_agent ): """ - Test that response model is not set if the raw response doesn't have a model field. - This can happen with custom model implementations. + Test that response model is not set if the API response doesn't have a model field. + The request model should still be set correctly. """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): - # Mock without _fetch_response (simulating custom model without this method) with patch( - "agents.models.openai_responses.OpenAIResponsesModel.get_response" - ) as mock_get_response: - response = ModelResponse( - output=[ - ResponseOutputMessage( - id="msg_123", - type="message", - status="completed", - content=[ - ResponseOutputText( - text="Response without model field", - type="output_text", - annotations=[], - ) - ], - role="assistant", - ) - ], - usage=Usage( - requests=1, - input_tokens=10, - output_tokens=20, - total_tokens=30, - ), - response_id="resp_123", + "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" + ) as mock_fetch_response: + # Create a mock response without a model field + mock_response = MagicMock() + mock_response.model = None # No model in response + mock_response.id = "resp_123" + mock_response.output = [ + ResponseOutputMessage( + id="msg_123", + type="message", + status="completed", + content=[ + ResponseOutputText( + text="Response without model field", + type="output_text", + annotations=[], + ) + ], + role="assistant", + ) + ] + mock_response.usage = MagicMock() + mock_response.usage.input_tokens = 10 + mock_response.usage.output_tokens = 20 + mock_response.usage.total_tokens = 30 + mock_response.usage.input_tokens_details = InputTokensDetails( + cached_tokens=0 + ) + mock_response.usage.output_tokens_details = OutputTokensDetails( + reasoning_tokens=0 ) - # Don't set _sentry_response_model attribute - mock_get_response.return_value = response + + mock_fetch_response.return_value = mock_response sentry_init( integrations=[OpenAIAgentsIntegration()], @@ -1754,25 +1758,21 @@ async def test_response_model_not_set_when_unavailable( events = capture_events() - # Remove the _fetch_response method to simulate custom model - with patch.object( - agents.models.openai_responses.OpenAIResponsesModel, - "_fetch_response", - None, - ): - result = await agents.Runner.run( - test_agent, "Test input", run_config=test_run_config - ) + result = await agents.Runner.run( + test_agent, "Test input", run_config=test_run_config + ) - assert result is not None + assert result is not None (transaction,) = events spans = transaction["spans"] _, ai_client_span = spans - # When response model can't be captured, it shouldn't be in the span data - # (we only set it when we can accurately capture it) + # Response model should NOT be set when API doesn't return it assert "gen_ai.response.model" not in ai_client_span["data"] + # But request model should still be set + assert "gen_ai.request.model" in ai_client_span["data"] + assert ai_client_span["data"]["gen_ai.request.model"] == "gpt-4" @pytest.mark.asyncio @@ -1780,15 +1780,14 @@ async def test_invoke_agent_span_includes_response_model( sentry_init, capture_events, test_agent ): """ - Test that invoke_agent spans include the response model. - When an agent makes multiple LLM calls, it should report the last model used. + Test that invoke_agent spans include the response model from the API response. """ with patch.dict(os.environ, {"OPENAI_API_KEY": "test-key"}): with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # Create a mock OpenAI Response object with a model field + # Create a mock OpenAI Response object with a specific model version mock_response = MagicMock() mock_response.model = "gpt-4.1-2025-04-14" # The actual response model mock_response.id = "resp_123" @@ -1838,7 +1837,7 @@ async def test_invoke_agent_span_includes_response_model( spans = transaction["spans"] invoke_agent_span, ai_client_span = spans - # Verify invoke_agent span has response model + # Verify invoke_agent span has response model from API assert invoke_agent_span["description"] == "invoke_agent test_agent" assert "gen_ai.response.model" in invoke_agent_span["data"] assert invoke_agent_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -1868,7 +1867,7 @@ def calculator(a: int, b: int) -> int: with patch( "agents.models.openai_responses.OpenAIResponsesModel._fetch_response" ) as mock_fetch_response: - # First call: gpt-4 model + # First call: gpt-4 model returns tool call first_response = MagicMock() first_response.model = "gpt-4-0613" first_response.id = "resp_1" @@ -1892,9 +1891,9 @@ def calculator(a: int, b: int) -> int: reasoning_tokens=0 ) - # Second call: different model (e.g., after tool execution) + # Second call: different model version returns final message second_response = MagicMock() - second_response.model = "gpt-4.1-2025-04-14" # Different model + second_response.model = "gpt-4.1-2025-04-14" second_response.id = "resp_2" second_response.output = [ ResponseOutputMessage( @@ -1946,11 +1945,11 @@ def calculator(a: int, b: int) -> int: first_ai_client_span = spans[1] second_ai_client_span = spans[3] # After tool span - # Verify invoke_agent span uses the LAST response model + # Invoke_agent span uses the LAST response model assert "gen_ai.response.model" in invoke_agent_span["data"] assert invoke_agent_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" - # Verify each ai_client span has its own response model + # Each ai_client span has its own response model from the API assert first_ai_client_span["data"]["gen_ai.response.model"] == "gpt-4-0613" assert ( second_ai_client_span["data"]["gen_ai.response.model"] == "gpt-4.1-2025-04-14" @@ -1998,3 +1997,73 @@ def test_openai_agents_message_truncation(sentry_init, capture_events): assert len(parsed_messages) == 2 assert "small message 4" in str(parsed_messages[0]) assert "small message 5" in str(parsed_messages[1]) + + +def test_streaming_patches_applied(sentry_init): + """ + Test that the streaming patches are applied correctly. + """ + sentry_init( + integrations=[OpenAIAgentsIntegration()], + traces_sample_rate=1.0, + ) + + # Verify that run_streamed is patched (will have __wrapped__ attribute if patched) + import agents + + # Check that the method exists and has been modified + assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed") + assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed") + + # Verify the patches were applied by checking for our wrapper + run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed + assert run_streamed_func is not None + + +@pytest.mark.asyncio +async def test_streaming_span_update_captures_response_data( + sentry_init, test_agent, mock_usage +): + """ + Test that update_ai_client_span correctly captures response text, + usage data, and response model from a streaming response. + """ + from sentry_sdk.integrations.openai_agents.spans.ai_client import ( + update_ai_client_span, + ) + + sentry_init( + integrations=[OpenAIAgentsIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + ) + + # Create a mock streaming response object (similar to what we'd get from ResponseCompletedEvent) + mock_streaming_response = MagicMock() + mock_streaming_response.model = "gpt-4-streaming" + mock_streaming_response.usage = mock_usage + mock_streaming_response.output = [ + ResponseOutputMessage( + id="msg_streaming_123", + type="message", + status="completed", + content=[ + ResponseOutputText( + text="Hello from streaming!", + type="output_text", + annotations=[], + ) + ], + role="assistant", + ) + ] + + # Test the unified update function (works for both streaming and non-streaming) + with start_span(op="gen_ai.chat", description="test chat") as span: + update_ai_client_span(span, mock_streaming_response) + + # Verify the span data was set correctly + assert span._data["gen_ai.response.text"] == "Hello from streaming!" + assert span._data["gen_ai.usage.input_tokens"] == 10 + assert span._data["gen_ai.usage.output_tokens"] == 20 + assert span._data["gen_ai.response.model"] == "gpt-4-streaming"