diff --git a/python/packages/claude/agent_framework_claude/_agent.py b/python/packages/claude/agent_framework_claude/_agent.py index 43f001b3db..59a18e6af2 100644 --- a/python/packages/claude/agent_framework_claude/_agent.py +++ b/python/packages/claude/agent_framework_claude/_agent.py @@ -5,8 +5,10 @@ import contextlib import logging import sys +import weakref from collections.abc import AsyncIterable, Awaitable, Callable, MutableMapping, Sequence from pathlib import Path +from time import perf_counter, time_ns from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, overload from agent_framework import ( @@ -27,6 +29,23 @@ normalize_tools, ) from agent_framework.exceptions import AgentException +from agent_framework.observability import ( + OBSERVABILITY_SETTINGS, + OtelAttr, + get_tracer, +) +# These internal helpers are used by AgentTelemetryLayer to build spans. +# ClaudeAgent cannot inherit AgentTelemetryLayer (MRO conflict with its own +# run()), so we reuse the same helpers directly. If the core package later +# exposes public equivalents, this import block should be updated. +from agent_framework.observability import ( + _capture_messages as capture_messages, + _capture_response as capture_response, + _get_response_attributes as get_response_attributes, + _get_span as get_span, + _get_span_attributes as get_span_attributes, + capture_exception, +) from claude_agent_sdk import ( AssistantMessage, ClaudeSDKClient, @@ -620,9 +639,119 @@ def run( self._get_stream(messages, session=session, options=options, **kwargs), finalizer=self._finalize_response, ) + + if not OBSERVABILITY_SETTINGS.ENABLED: + if stream: + return response + return response.get_final_response() + + provider_name = self.AGENT_PROVIDER_NAME + attributes = get_span_attributes( + operation_name=OtelAttr.AGENT_INVOKE_OPERATION, + provider_name=provider_name, + agent_id=self.id, + agent_name=self.name or self.id, + agent_description=self.description, + thread_id=session.service_session_id if session else None, + ) + if stream: + return self._run_with_telemetry_stream( + response, attributes, provider_name, messages, + ) + + return self._run_with_telemetry( + response, attributes, provider_name, messages, + ) + + def _run_with_telemetry_stream( + self, + result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]], + attributes: dict[str, Any], + provider_name: str, + messages: AgentRunInputs | None, + ) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: + """Wrap a streaming run with OpenTelemetry tracing.""" + operation = attributes.get(OtelAttr.OPERATION, "operation") + span_name = attributes.get(OtelAttr.AGENT_NAME, "unknown") + span = get_tracer().start_span(f"{operation} {span_name}") + span.set_attributes(attributes) + + if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages: + capture_messages( + span=span, + provider_name=provider_name, + messages=messages, + system_instructions=self._default_options.get("system_prompt"), + ) + + span_state = {"closed": False} + duration_state: dict[str, float] = {} + start_time = perf_counter() + + def _close_span() -> None: + if span_state["closed"]: + return + span_state["closed"] = True + span.end() + + def _record_duration() -> None: + duration_state["duration"] = perf_counter() - start_time + + async def _finalize_stream() -> None: + try: + response = await result_stream.get_final_response() + duration = duration_state.get("duration") + response_attributes = get_response_attributes(attributes, response) + capture_response(span=span, attributes=response_attributes, duration=duration) + if ( + OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED + and isinstance(response, AgentResponse) + and response.messages + ): + capture_messages( + span=span, provider_name=provider_name, messages=response.messages, output=True, + ) + except Exception as exception: + capture_exception(span=span, exception=exception, timestamp=time_ns()) + finally: + _close_span() + + wrapped_stream = result_stream.with_cleanup_hook(_record_duration).with_cleanup_hook(_finalize_stream) + weakref.finalize(wrapped_stream, _close_span) + return wrapped_stream + + async def _run_with_telemetry( + self, + result_stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]], + attributes: dict[str, Any], + provider_name: str, + messages: AgentRunInputs | None, + ) -> AgentResponse[Any]: + """Wrap a non-streaming run with OpenTelemetry tracing.""" + with get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span: + if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages: + capture_messages( + span=span, + provider_name=provider_name, + messages=messages, + system_instructions=self._default_options.get("system_prompt"), + ) + start_time = perf_counter() + try: + response = await result_stream.get_final_response() + except Exception as exception: + capture_exception(span=span, exception=exception, timestamp=time_ns()) + raise + duration = perf_counter() - start_time + if response: + response_attributes = get_response_attributes(attributes, response) + capture_response(span=span, attributes=response_attributes, duration=duration) + if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages: + capture_messages( + span=span, provider_name=provider_name, messages=response.messages, output=True, + ) return response - return response.get_final_response() def _finalize_response(self, updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]: """Build AgentResponse and propagate structured_output as value. diff --git a/python/packages/claude/tests/test_claude_agent.py b/python/packages/claude/tests/test_claude_agent.py index 0e126c36b9..99ebd58e58 100644 --- a/python/packages/claude/tests/test_claude_agent.py +++ b/python/packages/claude/tests/test_claude_agent.py @@ -945,3 +945,191 @@ async def test_structured_output_with_error_does_not_propagate(self) -> None: with pytest.raises(AgentException) as exc_info: await agent.run("Hello") assert "Something went wrong" in str(exc_info.value) + + +# region Test ClaudeAgent Telemetry + + +class TestClaudeAgentTelemetry: + """Tests for ClaudeAgent OpenTelemetry instrumentation.""" + + @staticmethod + async def _create_async_generator(items: list[Any]) -> Any: + """Helper to create async generator from list.""" + for item in items: + yield item + + def _create_mock_client(self, messages: list[Any]) -> MagicMock: + """Create a mock ClaudeSDKClient that yields given messages.""" + mock_client = MagicMock() + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.query = AsyncMock() + mock_client.set_model = AsyncMock() + mock_client.set_permission_mode = AsyncMock() + mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages)) + return mock_client + + def _create_standard_messages(self) -> list[Any]: + """Create a standard set of mock messages for testing.""" + from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock + from claude_agent_sdk.types import StreamEvent + + return [ + StreamEvent( + event={ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Hello!"}, + }, + uuid="event-1", + session_id="session-123", + ), + AssistantMessage( + content=[TextBlock(text="Hello!")], + model="claude-sonnet", + ), + ResultMessage( + subtype="success", + duration_ms=100, + duration_api_ms=50, + is_error=False, + num_turns=1, + session_id="session-123", + ), + ] + + async def test_run_emits_span_when_instrumentation_enabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that run() creates an OpenTelemetry span when instrumentation is enabled.""" + from agent_framework.observability import OBSERVABILITY_SETTINGS + + messages = self._create_standard_messages() + mock_client = self._create_mock_client(messages) + + monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True) + + with ( + patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client), + patch("agent_framework_claude._agent.get_span") as mock_get_span, + ): + mock_span = MagicMock() + mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span) + mock_get_span.return_value.__exit__ = MagicMock(return_value=False) + + agent = ClaudeAgent(name="test-agent") + response = await agent.run("Hello") + + assert response.text == "Hello!" + mock_get_span.assert_called_once() + call_kwargs = mock_get_span.call_args[1] + assert call_kwargs["attributes"]["gen_ai.agent.name"] == "test-agent" + assert call_kwargs["attributes"]["gen_ai.operation.name"] == "invoke_agent" + + async def test_run_skips_telemetry_when_instrumentation_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that run() skips telemetry when instrumentation is disabled.""" + from agent_framework.observability import OBSERVABILITY_SETTINGS + + messages = self._create_standard_messages() + mock_client = self._create_mock_client(messages) + + monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", False) + + with ( + patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client), + patch("agent_framework_claude._agent.get_span") as mock_get_span, + ): + agent = ClaudeAgent(name="test-agent") + response = await agent.run("Hello") + + assert response.text == "Hello!" + mock_get_span.assert_not_called() + + async def test_run_stream_emits_span_when_instrumentation_enabled(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that run(stream=True) creates a span when instrumentation is enabled.""" + from agent_framework.observability import OBSERVABILITY_SETTINGS + + messages = self._create_standard_messages() + mock_client = self._create_mock_client(messages) + + monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True) + + with ( + patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client), + patch("agent_framework_claude._agent.get_tracer") as mock_get_tracer, + ): + mock_span = MagicMock() + mock_tracer = MagicMock() + mock_tracer.start_span.return_value = mock_span + mock_get_tracer.return_value = mock_tracer + + agent = ClaudeAgent(name="stream-agent") + updates: list[AgentResponseUpdate] = [] + async for update in agent.run("Hello", stream=True): + updates.append(update) + + assert len(updates) == 1 + mock_tracer.start_span.assert_called_once() + span_name = mock_tracer.start_span.call_args[0][0] + assert "stream-agent" in span_name + assert "invoke_agent" in span_name + + async def test_run_captures_exception_in_span(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that exceptions during run() are captured in the telemetry span.""" + from agent_framework.exceptions import AgentException + from agent_framework.observability import OBSERVABILITY_SETTINGS + from claude_agent_sdk import ResultMessage + + error_messages = [ + ResultMessage( + subtype="error", + duration_ms=100, + duration_api_ms=50, + is_error=True, + num_turns=0, + session_id="error-session", + result="Model not found", + ), + ] + mock_client = self._create_mock_client(error_messages) + + monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True) + + with ( + patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client), + patch("agent_framework_claude._agent.get_span") as mock_get_span, + patch("agent_framework_claude._agent.capture_exception") as mock_capture_exc, + ): + mock_span = MagicMock() + mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span) + mock_get_span.return_value.__exit__ = MagicMock(return_value=False) + + agent = ClaudeAgent(name="error-agent") + with pytest.raises(AgentException): + await agent.run("Hello") + + mock_capture_exc.assert_called_once() + exc_kwargs = mock_capture_exc.call_args[1] + assert exc_kwargs["span"] is mock_span + assert isinstance(exc_kwargs["exception"], AgentException) + + async def test_telemetry_uses_correct_provider_name(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that telemetry uses AGENT_PROVIDER_NAME as provider.""" + from agent_framework.observability import OBSERVABILITY_SETTINGS + + messages = self._create_standard_messages() + mock_client = self._create_mock_client(messages) + + monkeypatch.setattr(OBSERVABILITY_SETTINGS, "enable_instrumentation", True) + + with ( + patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client), + patch("agent_framework_claude._agent.get_span") as mock_get_span, + ): + mock_span = MagicMock() + mock_get_span.return_value.__enter__ = MagicMock(return_value=mock_span) + mock_get_span.return_value.__exit__ = MagicMock(return_value=False) + + agent = ClaudeAgent(name="test-agent") + await agent.run("Hello") + + call_kwargs = mock_get_span.call_args[1] + assert call_kwargs["attributes"]["gen_ai.provider.name"] == "anthropic.claude"