diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 2d294daddd..8e72fdd4e6 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -100,6 +100,32 @@ 40.96, 81.92, ) +TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, +) +TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( + 0.001, + 0.002, + 0.004, + 0.008, + 0.016, + 0.032, + 0.064, + 0.128, + 0.256, + 0.512, + 1.024, +) # We're recording multiple events for the chat history, some of them are emitted within (hundreds of) @@ -1039,6 +1065,33 @@ def _get_token_usage_histogram() -> "metrics.Histogram": ) +def _get_time_to_first_chunk_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.time_to_first_chunk", # TODO(Brian Henson): Match semantic conventions + unit=OtelAttr.DURATION_UNIT, + description="Time from request start to first content chunk arrival", + explicit_bucket_boundaries_advisory=TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES, + ) + + +def _get_time_per_output_chunk_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.time_per_output_chunk", # TODO(Brian Henson): Match semantic conventions + unit=OtelAttr.DURATION_UNIT, + description="Average time between chunks after the first chunk", + explicit_bucket_boundaries_advisory=TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES, + ) + + +def _get_client_operation_duration_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.duration", + unit=OtelAttr.DURATION_UNIT, + description="Total time for the entire streaming operation from start to completion", + explicit_bucket_boundaries_advisory=OPERATION_DURATION_BUCKET_BOUNDARIES, + ) + + # region ChatClientProtocol @@ -1172,6 +1225,14 @@ async def trace_get_streaming_response( self.additional_properties["token_usage_histogram"] = _get_token_usage_histogram() if "operation_duration_histogram" not in self.additional_properties: self.additional_properties["operation_duration_histogram"] = _get_duration_histogram() + if "time_to_first_chunk_histogram" not in self.additional_properties: + self.additional_properties["time_to_first_chunk_histogram"] = _get_time_to_first_chunk_histogram() + if "time_per_output_chunk_histogram" not in self.additional_properties: + self.additional_properties["time_per_output_chunk_histogram"] = _get_time_per_output_chunk_histogram() + if "client_operation_duration_histogram" not in self.additional_properties: + self.additional_properties["client_operation_duration_histogram"] = ( + _get_client_operation_duration_histogram() + ) options = options or {} model_id = kwargs.get("model_id") or options.get("model_id") or getattr(self, "model_id", None) or "unknown" @@ -1197,15 +1258,53 @@ async def trace_get_streaming_response( system_instructions=options.get("instructions"), ) start_time_stamp = perf_counter() + first_chunk_time: float | None = None + previous_chunk_time: float | None = None + chunk_count = 0 + total_inter_chunk_time = 0.0 end_time_stamp: float | None = None try: async for update in func(self, messages=messages, options=options, **kwargs): + current_time = perf_counter() + if first_chunk_time is None: + # First chunk arrived + first_chunk_time = current_time + previous_chunk_time = current_time + chunk_count = 1 + else: + # Subsequent chunk - track inter-chunk timing + if previous_chunk_time is not None: + inter_chunk_time = current_time - previous_chunk_time + total_inter_chunk_time += inter_chunk_time + chunk_count += 1 + previous_chunk_time = current_time all_updates.append(update) yield update end_time_stamp = perf_counter() except Exception as exception: end_time_stamp = perf_counter() capture_exception(span=span, exception=exception, timestamp=time_ns()) + # Record metrics even if exception occurred (if we got at least one chunk) + if first_chunk_time is not None: + with contextlib.suppress(Exception): + _record_streaming_metrics( + span=span, + attributes=attributes, + start_time=start_time_stamp, + first_chunk_time=first_chunk_time, + end_time=end_time_stamp, + chunk_count=chunk_count, + total_inter_chunk_time=total_inter_chunk_time, + time_to_first_chunk_histogram=self.additional_properties[ + "time_to_first_chunk_histogram" + ], + time_per_output_chunk_histogram=self.additional_properties[ + "time_per_output_chunk_histogram" + ], + client_operation_duration_histogram=self.additional_properties[ + "client_operation_duration_histogram" + ], + ) raise else: duration = (end_time_stamp or perf_counter()) - start_time_stamp @@ -1220,6 +1319,25 @@ async def trace_get_streaming_response( operation_duration_histogram=self.additional_properties["operation_duration_histogram"], ) + # Record streaming-specific metrics + if first_chunk_time is not None: + _record_streaming_metrics( + span=span, + attributes=attributes, + start_time=start_time_stamp, + first_chunk_time=first_chunk_time, + end_time=end_time_stamp, + chunk_count=chunk_count, + total_inter_chunk_time=total_inter_chunk_time, + time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"], + time_per_output_chunk_histogram=self.additional_properties[ + "time_per_output_chunk_histogram" + ], + client_operation_duration_histogram=self.additional_properties[ + "client_operation_duration_histogram" + ], + ) + if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages: _capture_messages( span=span, @@ -1850,6 +1968,56 @@ def _capture_response( operation_duration_histogram.record(duration, attributes=attrs) +def _record_streaming_metrics( + span: trace.Span, + attributes: dict[str, Any], + start_time: float, + first_chunk_time: float, + end_time: float | None, + chunk_count: int, + total_inter_chunk_time: float, + time_to_first_chunk_histogram: "metrics.Histogram | None" = None, + time_per_output_chunk_histogram: "metrics.Histogram | None" = None, + client_operation_duration_histogram: "metrics.Histogram | None" = None, +) -> None: + """Record streaming-specific metrics for client operations. + + Args: + span: The span to record metrics for. + attributes: The attributes dictionary containing metric attributes. + start_time: The start time of the streaming operation. + first_chunk_time: The time when the first chunk arrived. + end_time: The end time of the streaming operation. + chunk_count: The number of chunks after the first chunk. + total_inter_chunk_time: The sum of inter-chunk intervals. + time_to_first_chunk_histogram: Histogram for time to first chunk metric. + time_per_output_chunk_histogram: Histogram for time per output chunk metric. + client_operation_duration_histogram: Histogram for total duration metric. + """ + if end_time is None: + return + + # Extract metric attributes (same as GEN_AI_METRIC_ATTRIBUTES) + attrs: dict[str, Any] = {k: v for k, v in attributes.items() if k in GEN_AI_METRIC_ATTRIBUTES} + if OtelAttr.ERROR_TYPE in attributes: + attrs[OtelAttr.ERROR_TYPE] = attributes[OtelAttr.ERROR_TYPE] + + # Calculate time to first chunk + time_to_first_chunk = first_chunk_time - start_time + if time_to_first_chunk_histogram: + time_to_first_chunk_histogram.record(time_to_first_chunk, attributes=attrs) + + # Calculate time per output chunk (average) + if chunk_count > 0 and time_per_output_chunk_histogram: + time_per_output_chunk = total_inter_chunk_time / chunk_count + time_per_output_chunk_histogram.record(time_per_output_chunk, attributes=attrs) + + # Calculate total duration + duration = end_time - start_time + if client_operation_duration_histogram: + client_operation_duration_histogram.record(duration, attributes=attrs) + + class EdgeGroupDeliveryStatus(Enum): """Enum for edge group delivery status values.""" diff --git a/python/packages/core/tests/core/test_observability.py b/python/packages/core/tests/core/test_observability.py index 3818a057bb..f8112138b0 100644 --- a/python/packages/core/tests/core/test_observability.py +++ b/python/packages/core/tests/core/test_observability.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import logging -from collections.abc import MutableSequence +from collections.abc import AsyncIterable, MutableSequence from typing import Any from unittest.mock import Mock @@ -831,6 +831,91 @@ def test_create_resource_with_custom_attributes(monkeypatch): assert resource.attributes["another_attr"] == 123 +# region Test Streaming Metrics + + +@pytest.fixture +def mock_timed_streaming_chat_client(): + """Create a mock chat client for streaming testing with timing.""" + + class MockTimedStreamingChatClient(BaseChatClient): + def service_url(self): + return "https://test.example.com" + + async def _inner_get_response(self, **kwargs): + pass + + async def _inner_get_streaming_response( + self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any + ): + import asyncio + + # Simulate delays to ensure timing metrics are non-zero + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 2", role=Role.ASSISTANT) + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 3", role=Role.ASSISTANT) + + return MockTimedStreamingChatClient + + +async def test_streaming_metrics_recorded(mock_timed_streaming_chat_client, span_exporter: InMemorySpanExporter): + """Test that streaming specific metrics are recorded correctly.""" + client = use_instrumentation(mock_timed_streaming_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + updates = [] + async for update in client.get_streaming_response(messages=messages, model_id="TestStreaming"): + updates.append(update) + + assert len(updates) == 3 + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + # Check that execution completed successfully and span was created + assert span.name == "chat TestStreaming" + assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION + + +@pytest.fixture +def mock_error_streaming_chat_client(): + """Create a mock chat client that fails during streaming.""" + + class MockErrorStreamingChatClient(BaseChatClient): + def service_url(self): + return "https://test.example.com" + + async def _inner_get_response(self, **kwargs): + pass + + async def _inner_get_streaming_response( + self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any + ): + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + raise ValueError("Stream interrupted") + + return MockErrorStreamingChatClient + + +async def test_streaming_metrics_with_error(mock_error_streaming_chat_client, span_exporter: InMemorySpanExporter): + """Test that metrics are recorded even if the stream fails after the first chunk.""" + client = use_instrumentation(mock_error_streaming_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + with pytest.raises(ValueError, match="Stream interrupted"): + async for _ in client.get_streaming_response(messages=messages, model_id="TestError"): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION + + # region Test _create_otlp_exporters @@ -2216,3 +2301,46 @@ def test_capture_response(span_exporter: InMemorySpanExporter): # Verify attributes were set on the span assert spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 100 assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50 + + +class ErrorChatClient(BaseChatClient): + """A chat client that raises an error during streaming.""" + + OTEL_PROVIDER_NAME = "error_provider" + + def service_url(self): + return "https://error.example.com" + + async def _inner_get_response(self, messages, options, **kwargs): + raise NotImplementedError + + async def _inner_get_streaming_response( + self, *, messages: list[ChatMessage], options: dict[str, Any], **kwargs: Any + ) -> AsyncIterable[ChatResponseUpdate]: + # Yield one chunk so metrics recording is triggered + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + # Then raise an exception + raise ValueError("Original Application Error") + + +async def test_streaming_error_with_metric_recording_failure(span_exporter: InMemorySpanExporter): + """ + Test that an exception during metric recording does not mask the original application error. + """ + from unittest.mock import patch + + client = use_instrumentation(ErrorChatClient)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + # Mock _record_streaming_metrics to raise an exception + with patch("agent_framework.observability._record_streaming_metrics") as mock_metrics: + mock_metrics.side_effect = Exception("Metric Recording Failed") + + # We expect the ORIGINAL ValueError, not the "Metric Recording Failed" exception + with pytest.raises(ValueError, match="Original Application Error"): + async for _ in client.get_streaming_response(messages=messages): + pass + + # Verify that _record_streaming_metrics was actually called + assert mock_metrics.called