Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 130 additions & 1 deletion python/packages/claude/agent_framework_claude/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import contextlib
import logging
import sys
import weakref
from collections.abc import AsyncIterable, Awaitable, Callable, MutableMapping, Sequence
from pathlib import Path
from time import perf_counter, time_ns
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, overload

from agent_framework import (
Expand All @@ -27,6 +29,23 @@
normalize_tools,
)
from agent_framework.exceptions import AgentException
from agent_framework.observability import (
OBSERVABILITY_SETTINGS,
OtelAttr,
get_tracer,
)
# These internal helpers are used by AgentTelemetryLayer to build spans.
# ClaudeAgent cannot inherit AgentTelemetryLayer (MRO conflict with its own
# run()), so we reuse the same helpers directly. If the core package later
# exposes public equivalents, this import block should be updated.
from agent_framework.observability import (
_capture_messages as capture_messages,
_capture_response as capture_response,
_get_response_attributes as get_response_attributes,
_get_span as get_span,
_get_span_attributes as get_span_attributes,
capture_exception,
)
from claude_agent_sdk import (
AssistantMessage,
ClaudeSDKClient,
Expand Down Expand Up @@ -620,9 +639,119 @@ def run(
self._get_stream(messages, session=session, options=options, **kwargs),
finalizer=self._finalize_response,
)

if not OBSERVABILITY_SETTINGS.ENABLED:
if stream:
return response
return response.get_final_response()

provider_name = self.AGENT_PROVIDER_NAME
attributes = get_span_attributes(
operation_name=OtelAttr.AGENT_INVOKE_OPERATION,
provider_name=provider_name,
agent_id=self.id,
agent_name=self.name or self.id,
agent_description=self.description,
thread_id=session.service_session_id if session else None,
)

if stream:
return self._run_with_telemetry_stream(
response, attributes, provider_name, messages,
)

return self._run_with_telemetry(
response, attributes, provider_name, messages,
)

def _run_with_telemetry_stream(
self,
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]],
attributes: dict[str, Any],
provider_name: str,
messages: AgentRunInputs | None,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Wrap a streaming run with OpenTelemetry tracing."""
operation = attributes.get(OtelAttr.OPERATION, "operation")
span_name = attributes.get(OtelAttr.AGENT_NAME, "unknown")
span = get_tracer().start_span(f"{operation} {span_name}")
span.set_attributes(attributes)

if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
capture_messages(
span=span,
provider_name=provider_name,
messages=messages,
system_instructions=self._default_options.get("system_prompt"),
)

span_state = {"closed": False}
duration_state: dict[str, float] = {}
start_time = perf_counter()

def _close_span() -> None:
if span_state["closed"]:
return
span_state["closed"] = True
span.end()

def _record_duration() -> None:
duration_state["duration"] = perf_counter() - start_time

async def _finalize_stream() -> None:
try:
response = await result_stream.get_final_response()
duration = duration_state.get("duration")
response_attributes = get_response_attributes(attributes, response)
capture_response(span=span, attributes=response_attributes, duration=duration)
if (
OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED
and isinstance(response, AgentResponse)
and response.messages
):
capture_messages(
span=span, provider_name=provider_name, messages=response.messages, output=True,
)
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
finally:
_close_span()

wrapped_stream = result_stream.with_cleanup_hook(_record_duration).with_cleanup_hook(_finalize_stream)
weakref.finalize(wrapped_stream, _close_span)
return wrapped_stream

async def _run_with_telemetry(
self,
result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]],
attributes: dict[str, Any],
provider_name: str,
messages: AgentRunInputs | None,
) -> AgentResponse[Any]:
"""Wrap a non-streaming run with OpenTelemetry tracing."""
with get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span:
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages:
capture_messages(
span=span,
provider_name=provider_name,
messages=messages,
system_instructions=self._default_options.get("system_prompt"),
)
start_time = perf_counter()
try:
response = await result_stream.get_final_response()
except Exception as exception:
capture_exception(span=span, exception=exception, timestamp=time_ns())
raise
duration = perf_counter() - start_time
if response:
response_attributes = get_response_attributes(attributes, response)
capture_response(span=span, attributes=response_attributes, duration=duration)
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages:
capture_messages(
span=span, provider_name=provider_name, messages=response.messages, output=True,
)
return response
return response.get_final_response()

def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
"""Build AgentResponse and propagate structured_output as value.
Expand Down
188 changes: 188 additions & 0 deletions python/packages/claude/tests/test_claude_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,3 +945,191 @@ async def test_structured_output_with_error_does_not_propagate(self) -> None:
with pytest.raises(AgentException) as exc_info:
await agent.run("Hello")
assert "Something went wrong" in str(exc_info.value)


# region Test ClaudeAgent Telemetry


class TestClaudeAgentTelemetry:
"""Tests for ClaudeAgent OpenTelemetry instrumentation."""

@staticmethod
async def _create_async_generator(items: list[Any]) -> Any:
"""Helper to create async generator from list."""
for item in items:
yield item

def _create_mock_client(self, messages: list[Any]) -> MagicMock:
"""Create a mock ClaudeSDKClient that yields given messages."""
mock_client = MagicMock()
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.query = AsyncMock()
mock_client.set_model = AsyncMock()
mock_client.set_permission_mode = AsyncMock()
mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages))
return mock_client

def _create_standard_messages(self) -> list[Any]:
"""Create a standard set of mock messages for testing."""
from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock
from claude_agent_sdk.types import StreamEvent

return [
StreamEvent(
event={
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "Hello!"},
},
uuid="event-1",
session_id="session-123",
),
AssistantMessage(
content=[TextBlock(text="Hello!")],
model="claude-sonnet",
),
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="session-123",
),
]

async def test_run_emits_span_when_instrumentation_enabled(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that run() creates an OpenTelemetry span when instrumentation is enabled."""
from agent_framework.observability import OBSERVABILITY_SETTINGS

messages = self._create_standard_messages()
mock_client = self._create_mock_client(messages)

monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True)

with (
patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client),
patch("agent_framework_claude._agent.get_span") as mock_get_span,
):
mock_span = MagicMock()
mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span)
mock_get_span.return_value.__exit__ = MagicMock(return_value=False)

agent = ClaudeAgent(name="test-agent")
response = await agent.run("Hello")

assert response.text == "Hello!"
mock_get_span.assert_called_once()
call_kwargs = mock_get_span.call_args[1]
assert call_kwargs["attributes"]["gen_ai.agent.name"] == "test-agent"
assert call_kwargs["attributes"]["gen_ai.operation.name"] == "invoke_agent"

async def test_run_skips_telemetry_when_instrumentation_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that run() skips telemetry when instrumentation is disabled."""
from agent_framework.observability import OBSERVABILITY_SETTINGS

messages = self._create_standard_messages()
mock_client = self._create_mock_client(messages)

monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", False)

with (
patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client),
patch("agent_framework_claude._agent.get_span") as mock_get_span,
):
agent = ClaudeAgent(name="test-agent")
response = await agent.run("Hello")

assert response.text == "Hello!"
mock_get_span.assert_not_called()

async def test_run_stream_emits_span_when_instrumentation_enabled(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that run(stream=True) creates a span when instrumentation is enabled."""
from agent_framework.observability import OBSERVABILITY_SETTINGS

messages = self._create_standard_messages()
mock_client = self._create_mock_client(messages)

monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True)

with (
patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client),
patch("agent_framework_claude._agent.get_tracer") as mock_get_tracer,
):
mock_span = MagicMock()
mock_tracer = MagicMock()
mock_tracer.start_span.return_value = mock_span
mock_get_tracer.return_value = mock_tracer

agent = ClaudeAgent(name="stream-agent")
updates: list[AgentResponseUpdate] = []
async for update in agent.run("Hello", stream=True):
updates.append(update)

assert len(updates) == 1
mock_tracer.start_span.assert_called_once()
span_name = mock_tracer.start_span.call_args[0][0]
assert "stream-agent" in span_name
assert "invoke_agent" in span_name

async def test_run_captures_exception_in_span(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that exceptions during run() are captured in the telemetry span."""
from agent_framework.exceptions import AgentException
from agent_framework.observability import OBSERVABILITY_SETTINGS
from claude_agent_sdk import ResultMessage

error_messages = [
ResultMessage(
subtype="error",
duration_ms=100,
duration_api_ms=50,
is_error=True,
num_turns=0,
session_id="error-session",
result="Model not found",
),
]
mock_client = self._create_mock_client(error_messages)

monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True)

with (
patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client),
patch("agent_framework_claude._agent.get_span") as mock_get_span,
patch("agent_framework_claude._agent.capture_exception") as mock_capture_exc,
):
mock_span = MagicMock()
mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span)
mock_get_span.return_value.__exit__ = MagicMock(return_value=False)

agent = ClaudeAgent(name="error-agent")
with pytest.raises(AgentException):
await agent.run("Hello")

mock_capture_exc.assert_called_once()
exc_kwargs = mock_capture_exc.call_args[1]
assert exc_kwargs["span"] is mock_span
assert isinstance(exc_kwargs["exception"], AgentException)

async def test_telemetry_uses_correct_provider_name(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that telemetry uses AGENT_PROVIDER_NAME as provider."""
from agent_framework.observability import OBSERVABILITY_SETTINGS

messages = self._create_standard_messages()
mock_client = self._create_mock_client(messages)

monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True)

with (
patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client),
patch("agent_framework_claude._agent.get_span") as mock_get_span,
):
mock_span = MagicMock()
mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span)
mock_get_span.return_value.__exit__ = MagicMock(return_value=False)

agent = ClaudeAgent(name="test-agent")
await agent.run("Hello")

call_kwargs = mock_get_span.call_args[1]
assert call_kwargs["attributes"]["gen_ai.provider.name"] == "anthropic.claude"
Loading