Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,12 @@ class SPANDATA:
Example: "ResearchAssistant"
"""

GEN_AI_CONVERSATION_ID = "gen_ai.conversation.id"
"""
The unique identifier for the conversation/thread with the AI model.
Example: "conv_abc123"
"""

GEN_AI_CHOICE = "gen_ai.choice"
"""
The model's response message.
Expand Down
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_create_get_model_wrapper,
_create_get_all_tools_wrapper,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
_patch_error_tracing,
)
Expand All @@ -25,12 +26,16 @@ def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run
)

# Creating the actual spans for each agent run.
# Patch streaming runner
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
)

# Creating the actual spans for each agent run (works for both streaming and non-streaming).
_patch_agent_run()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _create_get_model_wrapper # noqa: F401
from .tools import _create_get_all_tools_wrapper # noqa: F401
from .runner import _create_run_wrapper # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
140 changes: 110 additions & 30 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from functools import wraps

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import reraise
from ..spans import (
Expand Down Expand Up @@ -31,22 +32,10 @@ def _patch_agent_run() -> None:

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

def _start_invoke_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
kwargs: "dict[str, Any]",
) -> "Span":
"""Start an agent invocation span"""
# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, kwargs)
context_wrapper._sentry_agent_span = span

return span

def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None
Expand All @@ -57,6 +46,46 @@ def _get_current_agent(
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)

def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
"""Close the workflow span for streaming executions if it exists."""
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(*sys.exc_info())
delattr(agent, "_sentry_workflow_span")

def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.

Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)

# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span

if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

return span

@wraps(
original_run_single_turn.__func__
if hasattr(original_run_single_turn, "__func__")
Expand All @@ -68,28 +97,18 @@ async def patched_run_single_turn(
"""Patched _run_single_turn that creates agent invocation spans"""
agent = kwargs.get("agent")
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

span = getattr(context_wrapper, "_sentry_agent_span", None)
# Start agent span when agent starts (but only once per agent)
if should_run_agent_start_hooks and agent and context_wrapper:
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)
span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)

span = _start_invoke_agent_span(context_wrapper, agent, kwargs)
agent._sentry_agent_span = span

# Call original method with all the correct parameters
try:
result = await original_run_single_turn(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)

reraise(*sys.exc_info())

return result
Expand Down Expand Up @@ -117,7 +136,9 @@ async def patched_execute_handoffs(
# Call original method with all parameters
try:
result = await original_execute_handoffs(*args, **kwargs)

except Exception:
_close_streaming_workflow_span(agent)
raise
finally:
# End span for current agent after handoff processing is complete
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
Expand All @@ -139,18 +160,77 @@ async def patched_execute_final_output(
context_wrapper = kwargs.get("context_wrapper")
final_output = kwargs.get("final_output")

# Call original method with all parameters
try:
result = await original_execute_final_output(*args, **kwargs)
finally:
# End span for current agent after final output processing is complete
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
end_invoke_agent_span(context_wrapper, agent, final_output)
# For streaming, close the workflow span (non-streaming uses context manager in _create_run_wrapper)
_close_streaming_workflow_span(agent)

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
streamed_result = args[0] if len(args) > 0 else kwargs.get("streamed_result")
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = bool(
args[5]
if len(args) > 5
else kwargs.get("should_run_agent_start_hooks", False)
)

span_kwargs: "dict[str, Any]" = {}
if streamed_result and hasattr(streamed_result, "input"):
span_kwargs["original_input"] = streamed_result.input

span = _maybe_start_agent_span(
context_wrapper,
agent,
should_run_agent_start_hooks,
span_kwargs,
is_streaming=True,
)

try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
_close_streaming_workflow_span(agent)
reraise(*sys.exc_info())

return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
80 changes: 66 additions & 14 deletions sentry_sdk/integrations/openai_agents/patches/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import sys
from functools import wraps

from sentry_sdk.integrations import DidNotEnable
Expand All @@ -9,15 +10,24 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Callable

from typing import Any, Callable, Optional

try:
import agents
except ImportError:
raise DidNotEnable("OpenAI Agents not installed")


def _set_response_model_on_agent_span(
agent: "agents.Agent", response_model: "Optional[str]"
) -> None:
"""Set the response model on the agent's invoke_agent span if available."""
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(SPANDATA.GEN_AI_RESPONSE_MODEL, response_model)


def _create_get_model_wrapper(
original_get_model: "Callable[..., Any]",
) -> "Callable[..., Any]":
Expand All @@ -37,15 +47,19 @@ def wrapped_get_model(
# because we only patch its direct methods, all underlying data can remain unchanged.
model = copy.copy(original_get_model(agent, run_config))

# Wrap _fetch_response if it exists (for OpenAI models) to capture raw response model
# Capture the request model name for spans (agent.model can be None when using defaults)
request_model_name = model.model if hasattr(model, "model") else str(model)
agent._sentry_request_model = request_model_name

# Wrap _fetch_response if it exists (for OpenAI models) to capture response model
if hasattr(model, "_fetch_response"):
original_fetch_response = model._fetch_response

@wraps(original_fetch_response)
async def wrapped_fetch_response(*args: "Any", **kwargs: "Any") -> "Any":
response = await original_fetch_response(*args, **kwargs)
if hasattr(response, "model"):
agent._sentry_raw_response_model = str(response.model)
if hasattr(response, "model") and response.model:
agent._sentry_response_model = str(response.model)
return response

model._fetch_response = wrapped_fetch_response
Expand All @@ -57,22 +71,60 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":
with ai_client_span(agent, kwargs) as span:
result = await original_get_response(*args, **kwargs)

response_model = getattr(agent, "_sentry_raw_response_model", None)
# Get response model captured from _fetch_response and clean up
response_model = getattr(agent, "_sentry_response_model", None)
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
)
delattr(agent, "_sentry_response_model")

delattr(agent, "_sentry_raw_response_model")

update_ai_client_span(span, agent, kwargs, result, response_model)
_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, result, response_model, agent)

return result

model.get_response = wrapped_get_response

# Also wrap stream_response for streaming support
if hasattr(model, "stream_response"):
original_stream_response = model.stream_response

@wraps(original_stream_response)
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
# Uses explicit try/finally instead of context manager to ensure cleanup
# even if the consumer abandons the stream (GeneratorExit).
span_kwargs = dict(kwargs)
if len(args) > 0:
span_kwargs["system_instructions"] = args[0]
if len(args) > 1:
span_kwargs["input"] = args[1]

span = ai_client_span(agent, span_kwargs)
span.__enter__()
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

streaming_response = None
try:
async for event in original_stream_response(*args, **kwargs):
# Capture the full response from ResponseCompletedEvent
if hasattr(event, "response"):
streaming_response = event.response
yield event

# Update span with response data (usage, output, model)
if streaming_response:
response_model = (
str(streaming_response.model)
if hasattr(streaming_response, "model")
and streaming_response.model
else None
)

_set_response_model_on_agent_span(agent, response_model)
update_ai_client_span(span, streaming_response, agent=agent)
finally:
span.__exit__(*sys.exc_info())
Comment on lines +119 to +124
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _sentry_conversation_id is not propagated to secondary agents during handoffs, causing subsequent spans for that agent to lack the conversation ID and breaking trace continuity.
Severity: HIGH

Suggested Fix

Propagate the _sentry_conversation_id to handoff agents before they are executed. This could be achieved by setting the attribute on the handoff agent object before it is used in _run_single_turn, or by storing the conversation_id in a context that persists across handoffs, ensuring all agents in the chain have access to it.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_sdk/integrations/openai_agents/patches/models.py#L119-L124

Potential issue: The `_sentry_conversation_id` attribute is set on the initial agent at
the start of an execution run. However, when this agent hands off control to a secondary
agent, the `_sentry_conversation_id` is not propagated. As a result, any subsequent
spans created for the secondary agent, such as `invoke_agent_span`, `ai_client_span`,
and `execute_tool_span`, will be missing the conversation ID. This leads to incomplete
trace data, as the spans from the secondary agent cannot be linked to the overall
conversation.

Did we get this right? 👍 / 👎 to inform future reviews.


model.stream_response = wrapped_stream_response

return model

return wrapped_get_model
Loading
Loading