From 4f15dc049b05c3df2b974435842f513a14ca5d19 Mon Sep 17 00:00:00 2001 From: Brian Henson Date: Thu, 27 Nov 2025 08:14:20 -0700 Subject: [PATCH 1/3] Python: Enhance observability metrics with new histograms for latency measurements * Added TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES and TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES for improved metric tracking. * Implemented _get_time_to_first_chunk_histogram and _get_time_per_output_chunk_histogram functions to create new histograms. * Updated _trace_get_streaming_response to record metrics for time to first chunk and time per output chunk. * Introduced _record_streaming_metrics function to handle the recording of streaming-specific metrics. --- .../core/agent_framework/observability.py | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 3e44fae23c..ecdd25ce02 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -94,6 +94,32 @@ 40.96, 81.92, ) +TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, +) +TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = ( + 0.001, + 0.002, + 0.004, + 0.008, + 0.016, + 0.032, + 0.064, + 0.128, + 0.256, + 0.512, + 1.024, +) # We're recording multiple events for the chat history, some of them are emitted within (hundreds of) @@ -804,6 +830,33 @@ def _get_token_usage_histogram() -> "metrics.Histogram": ) +def _get_time_to_first_chunk_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.time_to_first_chunk", + unit=OtelAttr.DURATION_UNIT, + description="Time from request start to first content chunk arrival", + explicit_bucket_boundaries_advisory=TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES, + ) + + +def _get_time_per_output_chunk_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.time_per_output_chunk", + unit=OtelAttr.DURATION_UNIT, + description="Average time between chunks after the first chunk", + explicit_bucket_boundaries_advisory=TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES, + ) + + +def _get_client_operation_duration_histogram() -> "metrics.Histogram": + return get_meter().create_histogram( + name="gen_ai.client.operation.duration", + unit=OtelAttr.DURATION_UNIT, + description="Total time for the entire streaming operation from start to completion", + explicit_bucket_boundaries_advisory=OPERATION_DURATION_BUCKET_BOUNDARIES, + ) + + # region ChatClientProtocol @@ -929,6 +982,14 @@ async def trace_get_streaming_response( self.additional_properties["token_usage_histogram"] = _get_token_usage_histogram() if "operation_duration_histogram" not in self.additional_properties: self.additional_properties["operation_duration_histogram"] = _get_duration_histogram() + if "time_to_first_chunk_histogram" not in self.additional_properties: + self.additional_properties["time_to_first_chunk_histogram"] = _get_time_to_first_chunk_histogram() + if "time_per_output_chunk_histogram" not in self.additional_properties: + self.additional_properties["time_per_output_chunk_histogram"] = _get_time_per_output_chunk_histogram() + if "client_operation_duration_histogram" not in self.additional_properties: + self.additional_properties["client_operation_duration_histogram"] = ( + _get_client_operation_duration_histogram() + ) model_id = ( kwargs.get("model_id") @@ -957,15 +1018,49 @@ async def trace_get_streaming_response( messages=messages, ) start_time_stamp = perf_counter() + first_chunk_time: float | None = None + previous_chunk_time: float | None = None + chunk_count = 0 + total_inter_chunk_time = 0.0 end_time_stamp: float | None = None try: async for update in func(self, messages=messages, **kwargs): + current_time = perf_counter() + if first_chunk_time is None: + # First chunk arrived + first_chunk_time = current_time + previous_chunk_time = current_time + else: + # Subsequent chunk - track inter-chunk timing + if previous_chunk_time is not None: + inter_chunk_time = current_time - previous_chunk_time + total_inter_chunk_time += inter_chunk_time + chunk_count += 1 + previous_chunk_time = current_time all_updates.append(update) yield update end_time_stamp = perf_counter() except Exception as exception: end_time_stamp = perf_counter() capture_exception(span=span, exception=exception, timestamp=time_ns()) + # Record metrics even if exception occurred (if we got at least one chunk) + if first_chunk_time is not None: + _record_streaming_metrics( + span=span, + attributes=attributes, + start_time=start_time_stamp, + first_chunk_time=first_chunk_time, + end_time=end_time_stamp, + chunk_count=chunk_count, + total_inter_chunk_time=total_inter_chunk_time, + time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"], + time_per_output_chunk_histogram=self.additional_properties[ + "time_per_output_chunk_histogram" + ], + client_operation_duration_histogram=self.additional_properties[ + "client_operation_duration_histogram" + ], + ) raise else: duration = (end_time_stamp or perf_counter()) - start_time_stamp @@ -980,6 +1075,25 @@ async def trace_get_streaming_response( operation_duration_histogram=self.additional_properties["operation_duration_histogram"], ) + # Record streaming-specific metrics + if first_chunk_time is not None: + _record_streaming_metrics( + span=span, + attributes=attributes, + start_time=start_time_stamp, + first_chunk_time=first_chunk_time, + end_time=end_time_stamp, + chunk_count=chunk_count, + total_inter_chunk_time=total_inter_chunk_time, + time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"], + time_per_output_chunk_histogram=self.additional_properties[ + "time_per_output_chunk_histogram" + ], + client_operation_duration_histogram=self.additional_properties[ + "client_operation_duration_histogram" + ], + ) + if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages: _capture_messages( span=span, @@ -1542,6 +1656,56 @@ def _capture_response( operation_duration_histogram.record(duration, attributes=attrs) +def _record_streaming_metrics( + span: trace.Span, + attributes: dict[str, Any], + start_time: float, + first_chunk_time: float, + end_time: float | None, + chunk_count: int, + total_inter_chunk_time: float, + time_to_first_chunk_histogram: "metrics.Histogram | None" = None, + time_per_output_chunk_histogram: "metrics.Histogram | None" = None, + client_operation_duration_histogram: "metrics.Histogram | None" = None, +) -> None: + """Record streaming-specific metrics for client operations. + + Args: + span: The span to record metrics for. + attributes: The attributes dictionary containing metric attributes. + start_time: The start time of the streaming operation. + first_chunk_time: The time when the first chunk arrived. + end_time: The end time of the streaming operation. + chunk_count: The number of chunks after the first chunk. + total_inter_chunk_time: The sum of inter-chunk intervals. + time_to_first_chunk_histogram: Histogram for time to first chunk metric. + time_per_output_chunk_histogram: Histogram for time per output chunk metric. + client_operation_duration_histogram: Histogram for total duration metric. + """ + if end_time is None: + return + + # Extract metric attributes (same as GEN_AI_METRIC_ATTRIBUTES) + attrs: dict[str, Any] = {k: v for k, v in attributes.items() if k in GEN_AI_METRIC_ATTRIBUTES} + if OtelAttr.ERROR_TYPE in attributes: + attrs[OtelAttr.ERROR_TYPE] = attributes[OtelAttr.ERROR_TYPE] + + # Calculate time to first chunk + time_to_first_chunk = first_chunk_time - start_time + if time_to_first_chunk_histogram: + time_to_first_chunk_histogram.record(time_to_first_chunk, attributes=attrs) + + # Calculate time per output chunk (average) + if chunk_count > 0 and time_per_output_chunk_histogram: + time_per_output_chunk = total_inter_chunk_time / chunk_count + time_per_output_chunk_histogram.record(time_per_output_chunk, attributes=attrs) + + # Calculate total duration + duration = end_time - start_time + if client_operation_duration_histogram: + client_operation_duration_histogram.record(duration, attributes=attrs) + + class EdgeGroupDeliveryStatus(Enum): """Enum for edge group delivery status values.""" From 5d44435c81b980af7bf9d7c9befb55d8bbb7f5a6 Mon Sep 17 00:00:00 2001 From: Brian Henson Date: Mon, 2 Feb 2026 17:04:37 -0700 Subject: [PATCH 2/3] test: Add streaming chat client observability metric tests for chunk latency. --- .../core/tests/core/test_observability.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/python/packages/core/tests/core/test_observability.py b/python/packages/core/tests/core/test_observability.py index 3818a057bb..89927b9c6e 100644 --- a/python/packages/core/tests/core/test_observability.py +++ b/python/packages/core/tests/core/test_observability.py @@ -831,6 +831,91 @@ def test_create_resource_with_custom_attributes(monkeypatch): assert resource.attributes["another_attr"] == 123 +# region Test Streaming Metrics + + +@pytest.fixture +def mock_timed_streaming_chat_client(): + """Create a mock chat client for streaming testing with timing.""" + + class MockTimedStreamingChatClient(BaseChatClient): + def service_url(self): + return "https://test.example.com" + + async def _inner_get_response(self, **kwargs): + pass + + async def _inner_get_streaming_response( + self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any + ): + import asyncio + + # Simulate delays to ensure timing metrics are non-zero + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 2", role=Role.ASSISTANT) + await asyncio.sleep(0.01) + yield ChatResponseUpdate(text="Chunk 3", role=Role.ASSISTANT) + + return MockTimedStreamingChatClient + + +async def test_streaming_metrics_recorded(mock_timed_streaming_chat_client, span_exporter: InMemorySpanExporter): + """Test that streaming specific metrics are recorded correctly.""" + client = use_instrumentation(mock_timed_streaming_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + updates = [] + async for update in client.get_streaming_response(messages=messages, model_id="TestStreaming"): + updates.append(update) + + assert len(updates) == 3 + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + # Check that execution completed successfully and span was created + assert span.name == "chat TestStreaming" + assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION + + +@pytest.fixture +def mock_error_streaming_chat_client(): + """Create a mock chat client that fails during streaming.""" + + class MockErrorStreamingChatClient(BaseChatClient): + def service_url(self): + return "https://test.example.com" + + async def _inner_get_response(self, **kwargs): + pass + + async def _inner_get_streaming_response( + self, *, messages: MutableSequence[ChatMessage], options: dict[str, Any], **kwargs: Any + ): + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + raise ValueError("Stream interrupted") + + return MockErrorStreamingChatClient + + +async def test_streaming_metrics_with_error(mock_error_streaming_chat_client, span_exporter: InMemorySpanExporter): + """Test that metrics are recorded even if the stream fails after the first chunk.""" + client = use_instrumentation(mock_error_streaming_chat_client)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + with pytest.raises(ValueError, match="Stream interrupted"): + async for _ in client.get_streaming_response(messages=messages, model_id="TestError"): + pass + + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + span = spans[0] + assert span.attributes[OtelAttr.OPERATION.value] == OtelAttr.CHAT_COMPLETION_OPERATION + + # region Test _create_otlp_exporters From 7423c9e5d430284124fe79f6900ac4a04decdbe6 Mon Sep 17 00:00:00 2001 From: Brian Henson Date: Tue, 3 Feb 2026 10:49:15 -0700 Subject: [PATCH 3/3] fix: Corrected chunk count logic and ensured metric recording failures do not mask original streaming operation exceptions --- .../core/agent_framework/observability.py | 40 +++++++++-------- .../core/tests/core/test_observability.py | 45 ++++++++++++++++++- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/python/packages/core/agent_framework/observability.py b/python/packages/core/agent_framework/observability.py index 5ceacc41bf..8e72fdd4e6 100644 --- a/python/packages/core/agent_framework/observability.py +++ b/python/packages/core/agent_framework/observability.py @@ -1067,7 +1067,7 @@ def _get_token_usage_histogram() -> "metrics.Histogram": def _get_time_to_first_chunk_histogram() -> "metrics.Histogram": return get_meter().create_histogram( - name="gen_ai.client.operation.time_to_first_chunk", + name="gen_ai.client.operation.time_to_first_chunk", # TODO(Brian Henson): Match semantic conventions unit=OtelAttr.DURATION_UNIT, description="Time from request start to first content chunk arrival", explicit_bucket_boundaries_advisory=TIME_TO_FIRST_CHUNK_BUCKET_BOUNDARIES, @@ -1076,7 +1076,7 @@ def _get_time_to_first_chunk_histogram() -> "metrics.Histogram": def _get_time_per_output_chunk_histogram() -> "metrics.Histogram": return get_meter().create_histogram( - name="gen_ai.client.operation.time_per_output_chunk", + name="gen_ai.client.operation.time_per_output_chunk", # TODO(Brian Henson): Match semantic conventions unit=OtelAttr.DURATION_UNIT, description="Average time between chunks after the first chunk", explicit_bucket_boundaries_advisory=TIME_PER_OUTPUT_CHUNK_BUCKET_BOUNDARIES, @@ -1270,6 +1270,7 @@ async def trace_get_streaming_response( # First chunk arrived first_chunk_time = current_time previous_chunk_time = current_time + chunk_count = 1 else: # Subsequent chunk - track inter-chunk timing if previous_chunk_time is not None: @@ -1285,22 +1286,25 @@ async def trace_get_streaming_response( capture_exception(span=span, exception=exception, timestamp=time_ns()) # Record metrics even if exception occurred (if we got at least one chunk) if first_chunk_time is not None: - _record_streaming_metrics( - span=span, - attributes=attributes, - start_time=start_time_stamp, - first_chunk_time=first_chunk_time, - end_time=end_time_stamp, - chunk_count=chunk_count, - total_inter_chunk_time=total_inter_chunk_time, - time_to_first_chunk_histogram=self.additional_properties["time_to_first_chunk_histogram"], - time_per_output_chunk_histogram=self.additional_properties[ - "time_per_output_chunk_histogram" - ], - client_operation_duration_histogram=self.additional_properties[ - "client_operation_duration_histogram" - ], - ) + with contextlib.suppress(Exception): + _record_streaming_metrics( + span=span, + attributes=attributes, + start_time=start_time_stamp, + first_chunk_time=first_chunk_time, + end_time=end_time_stamp, + chunk_count=chunk_count, + total_inter_chunk_time=total_inter_chunk_time, + time_to_first_chunk_histogram=self.additional_properties[ + "time_to_first_chunk_histogram" + ], + time_per_output_chunk_histogram=self.additional_properties[ + "time_per_output_chunk_histogram" + ], + client_operation_duration_histogram=self.additional_properties[ + "client_operation_duration_histogram" + ], + ) raise else: duration = (end_time_stamp or perf_counter()) - start_time_stamp diff --git a/python/packages/core/tests/core/test_observability.py b/python/packages/core/tests/core/test_observability.py index 89927b9c6e..f8112138b0 100644 --- a/python/packages/core/tests/core/test_observability.py +++ b/python/packages/core/tests/core/test_observability.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import logging -from collections.abc import MutableSequence +from collections.abc import AsyncIterable, MutableSequence from typing import Any from unittest.mock import Mock @@ -2301,3 +2301,46 @@ def test_capture_response(span_exporter: InMemorySpanExporter): # Verify attributes were set on the span assert spans[0].attributes.get(OtelAttr.INPUT_TOKENS) == 100 assert spans[0].attributes.get(OtelAttr.OUTPUT_TOKENS) == 50 + + +class ErrorChatClient(BaseChatClient): + """A chat client that raises an error during streaming.""" + + OTEL_PROVIDER_NAME = "error_provider" + + def service_url(self): + return "https://error.example.com" + + async def _inner_get_response(self, messages, options, **kwargs): + raise NotImplementedError + + async def _inner_get_streaming_response( + self, *, messages: list[ChatMessage], options: dict[str, Any], **kwargs: Any + ) -> AsyncIterable[ChatResponseUpdate]: + # Yield one chunk so metrics recording is triggered + yield ChatResponseUpdate(text="Chunk 1", role=Role.ASSISTANT) + # Then raise an exception + raise ValueError("Original Application Error") + + +async def test_streaming_error_with_metric_recording_failure(span_exporter: InMemorySpanExporter): + """ + Test that an exception during metric recording does not mask the original application error. + """ + from unittest.mock import patch + + client = use_instrumentation(ErrorChatClient)() + messages = [ChatMessage(role=Role.USER, text="Test")] + span_exporter.clear() + + # Mock _record_streaming_metrics to raise an exception + with patch("agent_framework.observability._record_streaming_metrics") as mock_metrics: + mock_metrics.side_effect = Exception("Metric Recording Failed") + + # We expect the ORIGINAL ValueError, not the "Metric Recording Failed" exception + with pytest.raises(ValueError, match="Original Application Error"): + async for _ in client.get_streaming_response(messages=messages): + pass + + # Verify that _record_streaming_metrics was actually called + assert mock_metrics.called