Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions sentry_sdk/integrations/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Supports the low-level `mcp.server.lowlevel.Server` API.
"""

from contextlib import contextmanager
import inspect
from functools import wraps
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -352,6 +353,30 @@ def _prepare_handler_data(
)


@contextmanager
def ensure_span(*args, **kwargs):
"""Ensure a span is created for the current context."""

current_span = sentry_sdk.get_current_span()
transaction_exists = (
current_span is not None and current_span.containing_transaction is not None
)

if transaction_exists:
with sentry_sdk.start_span(*args, **kwargs) as span:
yield span
else:
with sentry_sdk.start_transaction(*args, **kwargs):
with sentry_sdk.start_span(*args, **kwargs) as span:
yield span
# with get_start_span_function()(
# op=OP.MCP_SERVER,
# name=span_name,
# origin=MCPIntegration.origin,
# ) as span:
# yield span


async def _async_handler_wrapper(
handler_type: str,
func: "Callable[..., Any]",
Expand Down Expand Up @@ -382,7 +407,7 @@ async def _async_handler_wrapper(
) = _prepare_handler_data(handler_type, original_args, original_kwargs)

# Start span and execute
with get_start_span_function()(
with ensure_span(
op=OP.MCP_SERVER,
name=span_name,
origin=MCPIntegration.origin,
Expand Down Expand Up @@ -454,7 +479,7 @@ def _sync_handler_wrapper(
) = _prepare_handler_data(handler_type, original_args)

# Start span and execute
with get_start_span_function()(
with ensure_span(
op=OP.MCP_SERVER,
name=span_name,
origin=MCPIntegration.origin,
Expand Down
9 changes: 7 additions & 2 deletions sentry_sdk/integrations/openai_agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
_create_get_model_wrapper,
_create_get_all_tools_wrapper,
_create_run_wrapper,
_create_run_streamed_wrapper,
_patch_agent_run,
_patch_error_tracing,
)
Expand All @@ -25,12 +26,16 @@ def _patch_runner() -> None:
# Create the root span for one full agent run (including eventual handoffs)
# Note agents.run.DEFAULT_AGENT_RUNNER.run_sync is a wrapper around
# agents.run.DEFAULT_AGENT_RUNNER.run. It does not need to be wrapped separately.
# TODO-anton: Also patch streaming runner: agents.Runner.run_streamed
agents.run.DEFAULT_AGENT_RUNNER.run = _create_run_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run
)

# Creating the actual spans for each agent run.
# Patch streaming runner
agents.run.DEFAULT_AGENT_RUNNER.run_streamed = _create_run_streamed_wrapper(
agents.run.DEFAULT_AGENT_RUNNER.run_streamed
)

# Creating the actual spans for each agent run (works for both streaming and non-streaming).
_patch_agent_run()


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .models import _create_get_model_wrapper # noqa: F401
from .tools import _create_get_all_tools_wrapper # noqa: F401
from .runner import _create_run_wrapper # noqa: F401
from .runner import _create_run_wrapper, _create_run_streamed_wrapper # noqa: F401
from .agent_run import _patch_agent_run # noqa: F401
from .error_tracing import _patch_error_tracing # noqa: F401
74 changes: 74 additions & 0 deletions sentry_sdk/integrations/openai_agents/patches/agent_run.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from functools import wraps

from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.utils import reraise
from ..spans import (
Expand Down Expand Up @@ -31,6 +32,7 @@ def _patch_agent_run() -> None:

# Store original methods
original_run_single_turn = agents.run.AgentRunner._run_single_turn
original_run_single_turn_streamed = agents.run.AgentRunner._run_single_turn_streamed
original_execute_handoffs = agents._run_impl.RunImpl.execute_handoffs
original_execute_final_output = agents._run_impl.RunImpl.execute_final_output

Expand Down Expand Up @@ -147,10 +149,82 @@ async def patched_execute_final_output(
if agent and context_wrapper and _has_active_agent_span(context_wrapper):
end_invoke_agent_span(context_wrapper, agent, final_output)

# For streaming: close the workflow span if it exists
# (For non-streaming, the workflow span is closed by the context manager in _create_run_wrapper)
if agent and hasattr(agent, "_sentry_workflow_span"):
workflow_span = agent._sentry_workflow_span
workflow_span.__exit__(None, None, None)
delattr(agent, "_sentry_workflow_span")

return result

@wraps(
original_run_single_turn_streamed.__func__
if hasattr(original_run_single_turn_streamed, "__func__")
else original_run_single_turn_streamed
)
async def patched_run_single_turn_streamed(
cls: "agents.Runner", *args: "Any", **kwargs: "Any"
) -> "Any":
"""Patched _run_single_turn_streamed that creates agent invocation spans for streaming.

Note: Unlike _run_single_turn which uses keyword-only arguments (*,),
_run_single_turn_streamed uses positional arguments. The call signature is:
_run_single_turn_streamed(
streamed_result, # args[0]
agent, # args[1]
hooks, # args[2]
context_wrapper, # args[3]
run_config, # args[4]
should_run_agent_start_hooks, # args[5]
tool_use_tracker, # args[6]
all_tools, # args[7]
server_conversation_tracker, # args[8] (optional)
)
"""
# Extract positional arguments (streaming version doesn't use keyword-only args)
agent = args[1] if len(args) > 1 else kwargs.get("agent")
context_wrapper = args[3] if len(args) > 3 else kwargs.get("context_wrapper")
should_run_agent_start_hooks = (
args[5] if len(args) > 5 else kwargs.get("should_run_agent_start_hooks")
)

span = getattr(context_wrapper, "_sentry_agent_span", None)
# Start agent span when agent starts (but only once per agent)
if should_run_agent_start_hooks and agent and context_wrapper:
# End any existing span for a different agent
if _has_active_agent_span(context_wrapper):
current_agent = _get_current_agent(context_wrapper)
if current_agent and current_agent != agent:
end_invoke_agent_span(context_wrapper, current_agent)

# Build kwargs dict for span creation (for compatibility with _start_invoke_agent_span)
span_kwargs = {
"agent": agent,
"context_wrapper": context_wrapper,
"should_run_agent_start_hooks": should_run_agent_start_hooks,
}
span = _start_invoke_agent_span(context_wrapper, agent, span_kwargs)
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
agent._sentry_agent_span = span

# Call original streaming method
try:
result = await original_run_single_turn_streamed(*args, **kwargs)
except Exception as exc:
if span is not None and span.timestamp is None:
_record_exception_on_span(span, exc)
end_invoke_agent_span(context_wrapper, agent)

reraise(*sys.exc_info())

return result

# Apply patches
agents.run.AgentRunner._run_single_turn = classmethod(patched_run_single_turn)
agents.run.AgentRunner._run_single_turn_streamed = classmethod(
patched_run_single_turn_streamed
)
agents._run_impl.RunImpl.execute_handoffs = classmethod(patched_execute_handoffs)
agents._run_impl.RunImpl.execute_final_output = classmethod(
patched_execute_final_output
Expand Down
51 changes: 51 additions & 0 deletions sentry_sdk/integrations/openai_agents/patches/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,57 @@ async def wrapped_get_response(*args: "Any", **kwargs: "Any") -> "Any":

model.get_response = wrapped_get_response

# Also wrap stream_response for streaming support
if hasattr(model, "stream_response"):
original_stream_response = model.stream_response

@wraps(original_stream_response)
async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
"""
Wrap stream_response to create an AI client span for streaming.
stream_response is an async generator, so we yield events within the span.

Note: stream_response is called with positional args unlike get_response
which uses keyword args. The signature is:
stream_response(
system_instructions, # args[0]
input, # args[1]
model_settings, # args[2]
tools, # args[3]
output_schema, # args[4]
handoffs, # args[5]
tracing, # args[6]
*,
previous_response_id,
conversation_id,
prompt,
)
"""
# Build kwargs dict from positional args for span data capture
span_kwargs = dict(kwargs)
if len(args) > 0:
span_kwargs["system_instructions"] = args[0]
if len(args) > 1:
span_kwargs["input"] = args[1]

with ai_client_span(agent, span_kwargs) as span:
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)

async for event in original_stream_response(*args, **kwargs):
yield event

# Get response model if captured
response_model = getattr(agent, "_sentry_raw_response_model", None)
if response_model:
agent_span = getattr(agent, "_sentry_agent_span", None)
if agent_span:
agent_span.set_data(
SPANDATA.GEN_AI_RESPONSE_MODEL, response_model
)
delattr(agent, "_sentry_raw_response_model")

model.stream_response = wrapped_stream_response

return model

return wrapped_get_model
48 changes: 48 additions & 0 deletions sentry_sdk/integrations/openai_agents/patches/runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
from functools import wraps

import sentry_sdk
Expand Down Expand Up @@ -64,3 +65,50 @@ async def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
return run_result

return wrapper


def _create_run_streamed_wrapper(
original_func: "Callable[..., Any]",
) -> "Callable[..., Any]":
"""
Wraps the agents.Runner.run_streamed method to create a root span for streaming agent workflow runs.

Unlike run(), run_streamed() returns immediately with a RunResultStreaming object
while execution continues in a background task. The workflow span must stay open
throughout the streaming operation and close when streaming completes or is abandoned.

Note: We don't use isolation_scope() here because it uses context variables that
cannot span async boundaries (the __enter__ and __exit__ would be called from
different async contexts, causing ValueError).
"""

@wraps(original_func)
def wrapper(*args: "Any", **kwargs: "Any") -> "Any":
# Clone agent because agent invocation spans are attached per run.
agent = args[0].clone()

# Start workflow span immediately (before run_streamed returns)
workflow_span = agent_workflow_span(agent)
workflow_span.__enter__()

# Store span on agent for cleanup
agent._sentry_workflow_span = workflow_span

args = (agent, *args[1:])

try:
# Call original function to get RunResultStreaming
run_result = original_func(*args, **kwargs)
except Exception as exc:
# If run_streamed itself fails (not the background task), clean up immediately
workflow_span.__exit__(*sys.exc_info())
_capture_exception(exc)
raise exc from None

# Store references for cleanup
run_result._sentry_workflow_span = workflow_span
run_result._sentry_agent = agent

return run_result

return wrapper
21 changes: 21 additions & 0 deletions tests/integrations/openai_agents/test_openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,3 +1998,24 @@ def test_openai_agents_message_truncation(sentry_init, capture_events):
assert len(parsed_messages) == 2
assert "small message 4" in str(parsed_messages[0])
assert "small message 5" in str(parsed_messages[1])


def test_streaming_patches_applied(sentry_init):
"""
Test that the streaming patches are applied correctly.
"""
sentry_init(
integrations=[OpenAIAgentsIntegration()],
traces_sample_rate=1.0,
)

# Verify that run_streamed is patched (will have __wrapped__ attribute if patched)
import agents

# Check that the method exists and has been modified
assert hasattr(agents.run.DEFAULT_AGENT_RUNNER, "run_streamed")
assert hasattr(agents.run.AgentRunner, "_run_single_turn_streamed")

# Verify the patches were applied by checking for our wrapper
run_streamed_func = agents.run.DEFAULT_AGENT_RUNNER.run_streamed
assert run_streamed_func is not None
Loading