Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .patches import (
_get_model,
_get_all_tools,
_run_single_turn,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
Expand Down Expand Up @@ -35,6 +36,13 @@
run_loop = None
turn_preparation = None

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any

from agents.run_internal.run_steps import SingleStepResult


def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
Expand Down Expand Up @@ -70,7 +78,7 @@ class OpenAIAgentsIntegration(Integration):
3. In a loop, the agent repeatedly calls the Responses API, maintaining a conversation history that includes previous messages and tool results, which is passed to each call.
- A Model instance is created at the start of the loop by calling the `Runner._get_model()`. We patch the Model instance using `patches._get_model()`.
- Available tools are also deteremined at the start of the loop, with `Runner._get_all_tools()`. We patch Tool instances by iterating through the returned tools in `patches._get_all_tools()`.
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patched_run_single_turn()` and `patched_run_single_turn_streamed()`.
- In each loop iteration, `run_single_turn()` or `run_single_turn_streamed()` is responsible for calling the Responses API, patched with `patches._run_single_turn()` and `patched_run_single_turn_streamed()`.
4. On loop termination, `RunImpl.execute_final_output()` is called. The function is patched with `patched_execute_final_output()`.

Local tools are run based on the return value from the Responses API as a post-API call step in the above loop.
Expand Down Expand Up @@ -111,6 +119,15 @@ def new_wrapped_get_model(
return _get_model(turn_preparation.get_model, agent, run_config)

agents.run_internal.run_loop.get_model = new_wrapped_get_model

@wraps(run_loop.run_single_turn)
async def new_wrapped_run_single_turn(
*args: "Any", **kwargs: "Any"
) -> "SingleStepResult":
return await _run_single_turn(run_loop.run_single_turn, *args, **kwargs)

agents.run.run_single_turn = new_wrapped_run_single_turn

return

original_get_all_tools = AgentRunner._get_all_tools
Expand All @@ -134,3 +151,15 @@ def old_wrapped_get_model(
return _get_model(original_get_model, agent, run_config)

agents.run.AgentRunner._get_model = classmethod(old_wrapped_get_model)

original_run_single_turn = AgentRunner._run_single_turn

@wraps(AgentRunner._run_single_turn.__func__)
async def old_wrapped_run_single_turn(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "SingleStepResult":
return await _run_single_turn(original_run_single_turn, *args, **kwargs)

agents.run.AgentRunner._run_single_turn = classmethod(
old_wrapped_run_single_turn
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _get_model # noqa: F401
from .tools import _get_all_tools # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .agent_run import _run_single_turn, _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
164 changes: 83 additions & 81 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,108 +14,111 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Any, Optional
from typing import Any, Optional, Callable, Awaitable

from sentry_sdk.tracing import Span

from agents.run_internal.run_steps import SingleStepResult

try:
import agents
except ImportError:
raise DidNotEnable("OpenAI Agents not installed")


def _patch_agent_run() -> None:
"""
Patches AgentRunner methods to create agent invocation spans.
This directly patches the execution flow to track when agents start and stop.
def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None


def _get_current_agent(
context_wrapper: "agents.RunContextWrapper",
) -> "Optional[agents.Agent]":
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)


def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
"""Close the workflow span for streaming executions if it exists."""
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(*sys.exc_info())
delattr(agent, "_sentry_workflow_span")


def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output
Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)

def _has_active_agent_span(context_wrapper: "agents.RunContextWrapper") -> bool:
"""Check if there's an active agent span for this context"""
return getattr(context_wrapper, "_sentry_current_agent", None) is not None

def _get_current_agent(
context_wrapper: "agents.RunContextWrapper",
) -> "Optional[agents.Agent]":
"""Get the current agent from context wrapper"""
return getattr(context_wrapper, "_sentry_current_agent", None)

def _close_streaming_workflow_span(agent: "Optional[agents.Agent]") -> None:
"""Close the workflow span for streaming executions if it exists."""
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(*sys.exc_info())
delattr(agent, "_sentry_workflow_span")

def _maybe_start_agent_span(
context_wrapper: "agents.RunContextWrapper",
agent: "agents.Agent",
should_run_agent_start_hooks: bool,
span_kwargs: "dict[str, Any]",
is_streaming: bool = False,
) -> "Optional[Span]":
"""
Start an agent invocation span if conditions are met.
Handles ending any existing span for a different agent.
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

Returns the new span if started, or the existing span if conditions aren't met.
"""
if not (should_run_agent_start_hooks and agent and context_wrapper):
return getattr(context_wrapper, "_sentry_agent_span", None)
# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span

# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)
if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

# Store the agent on the context wrapper so we can access it later
context_wrapper._sentry_current_agent = agent
span = invoke_agent_span(context_wrapper, agent, span_kwargs)
context_wrapper._sentry_agent_span = span
agent._sentry_agent_span = span
return span

if is_streaming:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

return span
async def _run_single_turn(
original_run_single_turn: "Callable[..., Awaitable[SingleStepResult]]",
*args: "Any",
**kwargs: "Any",
) -> "SingleStepResult":
"""
Patched _run_single_turn that
- creates agent invocation spans if there is no already active agent invocation span.
- ends the agent invocation span if and only if an exception is raised in `_run_single_turn()`.
"""
agent = kwargs.get("agent")
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

@wraps(
original_run_single_turn.__func__
if hasattr(original_run_single_turn, "__func__")
else original_run_single_turn
span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)
async def patched_run_single_turn(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Patched _run_single_turn that
- creates agent invocation spans if there is no already active agent invocation span.
- ends the agent invocation span if and only if an exception is raised in `_run_single_turn()`.
"""
agent = kwargs.get("agent")
context_wrapper = kwargs.get("context_wrapper")
should_run_agent_start_hooks = kwargs.get("should_run_agent_start_hooks", False)

span = _maybe_start_agent_span(
context_wrapper, agent, should_run_agent_start_hooks, kwargs
)
try:
result = await original_run_single_turn(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
reraise(*sys.exc_info())

try:
result = await original_run_single_turn(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)
reraise(*sys.exc_info())
return result

return result

def _patch_agent_run() -> None:
"""
Patches AgentRunner methods to create agent invocation spans.
This directly patches the execution flow to track when agents start and stop.
"""

# Store original methods
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

@wraps(
original_execute_handoffs.__func__
Expand Down Expand Up @@ -252,7 +255,6 @@ async def patched_run_single_turn_streamed(
return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
Expand Down
Loading