From 421bf6a5879e706c3d6a3f7f2888d797116f8397 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 7 May 2026 12:22:36 -0400 Subject: [PATCH 1/9] feat(streaming): emit OTel metrics for ttft, tps, and per-call token counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds six metrics to TemporalStreamingModel.get_response so applications that configure an OTel MeterProvider can see streaming-call behavior without per-app instrumentation: - agentex.llm.ttft (histogram, ms): time from request start to first content delta. Captured on the first ResponseTextDeltaEvent / ResponseReasoningTextDeltaEvent / ResponseReasoningSummaryTextDeltaEvent. - agentex.llm.tps (histogram, tokens/s): output_tokens / stream_duration. Use 1/tps for time-per-output-token (tpot). - agentex.llm.input_tokens / output_tokens / cached_input_tokens / reasoning_tokens (counters): pulled from the captured ResponsesAPI Usage at end-of-stream. Cache hit rate is computed at query time as rate(cached_input_tokens) / rate(input_tokens). Why - The data was already captured (line 854 captured_usage = response.usage) but never emitted as metrics. Apps could only see total LLM call duration, not the meaningful breakdowns. - Doing this in the SDK rather than each app means every consumer of TemporalStreamingModel gets the metrics for free. - Cardinality is bounded — only `model` is a metric attribute. Resource attributes (service.name, k8s.*, etc.) come from the application's configured OTel resource, so cross-app comparisons work cleanly in Mimir/Prometheus. The meter is a no-op when no MeterProvider is configured, so this is safe for apps that don't run with OTel. --- .../models/temporal_streaming_model.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 4f18ae379..523cc690c 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1,6 +1,7 @@ """Custom Temporal Model Provider with streaming support for OpenAI agents.""" from __future__ import annotations +import time import uuid from typing import Any, List, Union, Optional, override @@ -26,6 +27,7 @@ CodeInterpreterTool, ImageGenerationTool, ) +from opentelemetry import metrics from agents.computer import Computer, AsyncComputer # Re-export the canonical StreamingMode literal from the streaming service so @@ -78,6 +80,44 @@ logger = make_logger("agentex.temporal.streaming") +# OTel metrics for LLM streaming behavior. The meter resolves to whatever +# MeterProvider the application has configured (no-op if none). All metrics +# carry only a ``model`` attribute to keep cardinality bounded; resource +# attributes (service.name, k8s.*, etc.) are added by the application's OTel +# resource configuration. +_meter = metrics.get_meter("agentex.openai_agents.streaming") +_ttft_ms = _meter.create_histogram( + name="agentex.llm.ttft", + unit="ms", + description="Time from streaming-request start to first content token (ms)", +) +_tps = _meter.create_histogram( + name="agentex.llm.tps", + unit="tokens/s", + description="Output tokens per second across the streaming response", +) +_input_tokens = _meter.create_counter( + name="agentex.llm.input_tokens", + unit="tokens", + description="Total input tokens sent to the LLM", +) +_output_tokens = _meter.create_counter( + name="agentex.llm.output_tokens", + unit="tokens", + description="Total output tokens returned by the LLM", +) +_cached_input_tokens = _meter.create_counter( + name="agentex.llm.cached_input_tokens", + unit="tokens", + description="Subset of input tokens served from prompt cache", +) +_reasoning_tokens = _meter.create_counter( + name="agentex.llm.reasoning_tokens", + unit="tokens", + description="Output tokens spent on reasoning (subset of output_tokens)", +) + + def _serialize_item(item: Any) -> dict[str, Any]: """ Universal serializer for any item type from OpenAI Agents SDK. @@ -642,6 +682,12 @@ async def get_response( reasoning_summaries = [] reasoning_contents = [] event_count = 0 + # Wall-clock instrumentation for ttft / tps / tpot. ``stream_start_perf`` + # bookmarks just before the event loop so the timer captures only the + # streaming portion, not request setup. ``first_token_at`` is set on + # the first content delta (text or reasoning summary). + stream_start_perf = time.perf_counter() + first_token_at: Optional[float] = None # We expect task_id to always be provided for streaming if not task_id: @@ -721,6 +767,10 @@ async def get_response( # Handle text streaming delta = getattr(event, 'delta', '') + # First content-bearing event in this stream — bookmark for ttft. + if first_token_at is None: + first_token_at = time.perf_counter() + if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context: # Stream reasoning summary deltas - these are the actual reasoning tokens! try: @@ -983,6 +1033,21 @@ async def get_response( span.output = output_data + # Emit LLM metrics derived from the captured stream. The meter is a + # no-op if the application hasn't configured a MeterProvider, so this + # is safe to do unconditionally. We only emit ttft / tps when their + # input data is actually meaningful (got a content delta, got tokens). + metric_attrs = {"model": self.model_name} + stream_duration_s = time.perf_counter() - stream_start_perf + _input_tokens.add(usage.input_tokens or 0, metric_attrs) + _output_tokens.add(usage.output_tokens or 0, metric_attrs) + _cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs) + _reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs) + if first_token_at is not None: + _ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) + if (usage.output_tokens or 0) > 0 and stream_duration_s > 0: + _tps.record(usage.output_tokens / stream_duration_s, metric_attrs) + # Return the response. response_id is the server-issued id from # ResponseCompletedEvent.response.id, or None when the stream ended # without a completed event (error path) — matching the documented From 45733c9565ba63d1aae6aac939795ec81db4ce48 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 7 May 2026 12:40:59 -0400 Subject: [PATCH 2/9] review: address greptile feedback on llm metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes from PR #347 review: 1. Move stream_start_perf above responses.create() so ttft captures the full request-to-first-token latency (HTTP round-trip + model TTFB), not just post-connect event-loop delay. Drop the duplicate assignment that was being overwritten after the await. 2. Track last_token_at alongside first_token_at and use the generation window (first→last delta) as the tps denominator. Total stream duration was inflated by tool-call argument deltas, reasoning events, and stream_update awaits — making tps under-report the model's actual generation speed for agentic responses. 3. Lazy-create the OTel instruments inside _StreamingMetrics rather than at import time, so a MeterProvider configured *after* this module is imported still binds correctly. Apps that bootstrap OTel in a startup hook (common with lazy-init patterns) would otherwise silently send to the no-op provider. --- .../models/temporal_streaming_model.py | 140 +++++++++++------- 1 file changed, 88 insertions(+), 52 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 523cc690c..e711e0051 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -80,42 +80,61 @@ logger = make_logger("agentex.temporal.streaming") -# OTel metrics for LLM streaming behavior. The meter resolves to whatever -# MeterProvider the application has configured (no-op if none). All metrics -# carry only a ``model`` attribute to keep cardinality bounded; resource -# attributes (service.name, k8s.*, etc.) are added by the application's OTel -# resource configuration. -_meter = metrics.get_meter("agentex.openai_agents.streaming") -_ttft_ms = _meter.create_histogram( - name="agentex.llm.ttft", - unit="ms", - description="Time from streaming-request start to first content token (ms)", -) -_tps = _meter.create_histogram( - name="agentex.llm.tps", - unit="tokens/s", - description="Output tokens per second across the streaming response", -) -_input_tokens = _meter.create_counter( - name="agentex.llm.input_tokens", - unit="tokens", - description="Total input tokens sent to the LLM", -) -_output_tokens = _meter.create_counter( - name="agentex.llm.output_tokens", - unit="tokens", - description="Total output tokens returned by the LLM", -) -_cached_input_tokens = _meter.create_counter( - name="agentex.llm.cached_input_tokens", - unit="tokens", - description="Subset of input tokens served from prompt cache", -) -_reasoning_tokens = _meter.create_counter( - name="agentex.llm.reasoning_tokens", - unit="tokens", - description="Output tokens spent on reasoning (subset of output_tokens)", -) +# OTel metrics for LLM streaming behavior. Instruments are created lazily on +# first use so the meter resolves to whatever MeterProvider the application +# eventually configures, even if that happens after this module is imported. +# All metrics carry only a ``model`` attribute to keep cardinality bounded; +# resource attributes (service.name, k8s.*, etc.) come from the application's +# OTel resource configuration. +class _StreamingMetrics: + """Lazily-created OTel instruments for streaming LLM telemetry.""" + + def __init__(self) -> None: + meter = metrics.get_meter("agentex.openai_agents.streaming") + self.ttft_ms = meter.create_histogram( + name="agentex.llm.ttft", + unit="ms", + description="Time from request submission to first content token (ms)", + ) + # Note: TPS denominator is the model-generation window + # (last_token_time - first_token_time), not total stream wall time. + # This isolates raw model throughput from event-loop / tool-call latency. + self.tps = meter.create_histogram( + name="agentex.llm.tps", + unit="tokens/s", + description="Output tokens per second over the generation window", + ) + self.input_tokens = meter.create_counter( + name="agentex.llm.input_tokens", + unit="tokens", + description="Total input tokens sent to the LLM", + ) + self.output_tokens = meter.create_counter( + name="agentex.llm.output_tokens", + unit="tokens", + description="Total output tokens returned by the LLM", + ) + self.cached_input_tokens = meter.create_counter( + name="agentex.llm.cached_input_tokens", + unit="tokens", + description="Subset of input tokens served from prompt cache", + ) + self.reasoning_tokens = meter.create_counter( + name="agentex.llm.reasoning_tokens", + unit="tokens", + description="Output tokens spent on reasoning (subset of output_tokens)", + ) + + +_streaming_metrics: Optional[_StreamingMetrics] = None + + +def _get_streaming_metrics() -> _StreamingMetrics: + """Return the streaming metrics singleton, creating it on first use.""" + global _streaming_metrics + if _streaming_metrics is None: + _streaming_metrics = _StreamingMetrics() + return _streaming_metrics def _serialize_item(item: Any) -> dict[str, Any]: @@ -632,7 +651,11 @@ async def get_response( # endpoints recognize this parameter, so we don't auto-inject a default. prompt_cache_key = extra_args.pop("prompt_cache_key", NOT_GIVEN) - # Create the response stream using Responses API + # Create the response stream using Responses API. + # Bookmark request start *before* the await so ttft captures the full + # user-perceived latency (HTTP round-trip + model TTFB), not just the + # post-connect event-loop delay. + stream_start_perf = time.perf_counter() logger.debug(f"[TemporalStreamingModel] Creating response stream with Responses API") stream = await self.client.responses.create( # type: ignore[call-overload] @@ -682,12 +705,12 @@ async def get_response( reasoning_summaries = [] reasoning_contents = [] event_count = 0 - # Wall-clock instrumentation for ttft / tps / tpot. ``stream_start_perf`` - # bookmarks just before the event loop so the timer captures only the - # streaming portion, not request setup. ``first_token_at`` is set on - # the first content delta (text or reasoning summary). - stream_start_perf = time.perf_counter() + # ttft / tps instrumentation. ``stream_start_perf`` is set above, + # before the responses.create() await, so it captures the full + # request-to-first-token latency. ``first_token_at`` and + # ``last_token_at`` bracket the model-generation window for tps. first_token_at: Optional[float] = None + last_token_at: Optional[float] = None # We expect task_id to always be provided for streaming if not task_id: @@ -767,9 +790,14 @@ async def get_response( # Handle text streaming delta = getattr(event, 'delta', '') - # First content-bearing event in this stream — bookmark for ttft. + # Bookmark first/last content-bearing events for ttft and tps. + # last_token_at is updated on every delta so tps measures only + # the model-generation window, not subsequent tool-call / + # event-handler time. + now_perf = time.perf_counter() if first_token_at is None: - first_token_at = time.perf_counter() + first_token_at = now_perf + last_token_at = now_perf if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context: # Stream reasoning summary deltas - these are the actual reasoning tokens! @@ -1037,16 +1065,24 @@ async def get_response( # no-op if the application hasn't configured a MeterProvider, so this # is safe to do unconditionally. We only emit ttft / tps when their # input data is actually meaningful (got a content delta, got tokens). + m = _get_streaming_metrics() metric_attrs = {"model": self.model_name} - stream_duration_s = time.perf_counter() - stream_start_perf - _input_tokens.add(usage.input_tokens or 0, metric_attrs) - _output_tokens.add(usage.output_tokens or 0, metric_attrs) - _cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs) - _reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs) + m.input_tokens.add(usage.input_tokens or 0, metric_attrs) + m.output_tokens.add(usage.output_tokens or 0, metric_attrs) + m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs) + m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs) if first_token_at is not None: - _ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) - if (usage.output_tokens or 0) > 0 and stream_duration_s > 0: - _tps.record(usage.output_tokens / stream_duration_s, metric_attrs) + m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) + # tps denominator is the generation window (first→last delta), not + # total stream wall time — see _StreamingMetrics for rationale. + if ( + first_token_at is not None + and last_token_at is not None + and last_token_at > first_token_at + and (usage.output_tokens or 0) > 0 + ): + generation_window_s = last_token_at - first_token_at + m.tps.record(usage.output_tokens / generation_window_s, metric_attrs) # Return the response. response_id is the server-issued id from # ResponseCompletedEvent.response.id, or None when the stream ended From 6209b20582b9a7e0f3c46a9d5e90487363b73579 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 7 May 2026 12:52:18 -0400 Subject: [PATCH 3/9] review: include tool-call argument tokens in tps generation window MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Bookmark first/last token timestamps on ResponseFunctionCallArgumentsDeltaEvent too so the tps generation window covers all event types whose tokens land in usage.output_tokens. Previously the numerator counted argument tokens but the denominator excluded their generation time, inflating tps for tool-heavy responses. - Lifted the bookmarking out of the text-delta branch into a single up-front check covering all four token-producing event types — cleaner than duplicating across branches. - Documented the single-token skip case (window collapses to 0) inline at the guard. TPS is undefined for a one-token response so emitting nothing is correct; the comment makes the intent visible to future readers. --- .../models/temporal_streaming_model.py | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index e711e0051..84d45b1aa 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -725,6 +725,20 @@ async def get_response( # Log event type logger.debug(f"[TemporalStreamingModel] Event {event_count}: {type(event).__name__}") + # Bookmark first/last token-producing events for ttft and tps. + # Includes function-call argument deltas so the generation window + # covers every event type whose tokens land in usage.output_tokens. + if isinstance(event, ( + ResponseTextDeltaEvent, + ResponseReasoningTextDeltaEvent, + ResponseReasoningSummaryTextDeltaEvent, + ResponseFunctionCallArgumentsDeltaEvent, + )): + now_perf = time.perf_counter() + if first_token_at is None: + first_token_at = now_perf + last_token_at = now_perf + # Handle different event types using isinstance for type safety if isinstance(event, ResponseOutputItemAddedEvent): # New output item (reasoning, function call, or message) @@ -790,15 +804,6 @@ async def get_response( # Handle text streaming delta = getattr(event, 'delta', '') - # Bookmark first/last content-bearing events for ttft and tps. - # last_token_at is updated on every delta so tps measures only - # the model-generation window, not subsequent tool-call / - # event-handler time. - now_perf = time.perf_counter() - if first_token_at is None: - first_token_at = now_perf - last_token_at = now_perf - if isinstance(event, ResponseReasoningSummaryTextDeltaEvent) and reasoning_context: # Stream reasoning summary deltas - these are the actual reasoning tokens! try: @@ -1075,6 +1080,9 @@ async def get_response( m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) # tps denominator is the generation window (first→last delta), not # total stream wall time — see _StreamingMetrics for rationale. + # Note: single-token responses (where first_token_at == last_token_at, + # e.g. a one-token tool-result acknowledgement) collapse the window + # to 0 and are intentionally skipped — TPS is undefined in that case. if ( first_token_at is not None and last_token_at is not None From da85d7b7eb84136f92909e154af69bc6f1c496f0 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 7 May 2026 13:07:47 -0400 Subject: [PATCH 4/9] review (stas): extract llm metrics to core/observability + add request counter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two follow-up changes from the PR review: 1. Move the LLM metric instruments from _StreamingMetrics in temporal_streaming_model.py to a new module: agentex.lib.core.observability.llm_metrics Public API: get_llm_metrics() returns a singleton LLMMetrics with the same six instruments (ttft, tps, input_tokens, output_tokens, cached_input_tokens, reasoning_tokens) plus a new requests counter. This makes the temporal+openai_agents plugin one of several future call sites — the sync ACP path and the Claude SDK plugin can record to the same instruments without redefining names, units, or descriptions. Keeps cross-provider naming consistent. 2. Add agentex.llm.requests counter with a status label so 429s, 5xxs, timeouts, and other failures are observable on the SDK side without scraping logs. classify_status() maps exception types to a small fixed set (success / rate_limit / server_error / client_error / timeout / network_error / other_error) by class name, so it works across OpenAI, Anthropic, and other provider SDKs that use similar exception naming. Recorded in two places: success path (alongside token counters) and the existing get_response except handler (so terminal failures emit a counter event before re-raising). Cardinality remains bounded — model + status (7 values) on the counter; all other metrics keep just `model`. --- .../lib/core/observability/__init__.py | 0 .../lib/core/observability/llm_metrics.py | 112 ++++++++++++++++++ .../models/temporal_streaming_model.py | 77 +++--------- 3 files changed, 128 insertions(+), 61 deletions(-) create mode 100644 src/agentex/lib/core/observability/__init__.py create mode 100644 src/agentex/lib/core/observability/llm_metrics.py diff --git a/src/agentex/lib/core/observability/__init__.py b/src/agentex/lib/core/observability/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/agentex/lib/core/observability/llm_metrics.py b/src/agentex/lib/core/observability/llm_metrics.py new file mode 100644 index 000000000..a8a9d0041 --- /dev/null +++ b/src/agentex/lib/core/observability/llm_metrics.py @@ -0,0 +1,112 @@ +"""OTel metrics for LLM calls. + +Single source of truth for LLM-call instrumentation across all agentex code +paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK +plugin in future PRs. Centralizing the instrument definitions here means +those follow-ups don't need to redefine the metric names, units, or +description strings; they import ``get_llm_metrics()`` and record values. + +The meter is no-op when the application hasn't configured a ``MeterProvider``, +so importing this module is safe for runtimes that don't use OTel. Instruments +are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider`` +configured *after* this module is imported still binds correctly. + +Cardinality is bounded: +- All metrics carry only ``model`` (the LLM model name). +- ``requests`` additionally carries ``status``, drawn from a small fixed set + (see ``classify_status``). + +Resource attributes (``service.name``, ``k8s.*``, etc.) come from the +application's OTel resource configuration and are added to every series +automatically. +""" + +from __future__ import annotations + +from typing import Optional + +from opentelemetry import metrics + + +class LLMMetrics: + """Lazily-created OTel instruments for LLM call telemetry.""" + + def __init__(self) -> None: + meter = metrics.get_meter("agentex.llm") + self.requests = meter.create_counter( + name="agentex.llm.requests", + unit="1", + description=( + "LLM call count tagged with status (success / rate_limit / " + "server_error / client_error / timeout / network_error / " + "other_error). Use to alert on 429s, 5xxs, etc." + ), + ) + self.ttft_ms = meter.create_histogram( + name="agentex.llm.ttft", + unit="ms", + description="Time from request submission to first content token (ms)", + ) + # Note: TPS denominator is the model-generation window + # (last_token_time - first_token_time), not total stream wall time. + # This isolates raw model throughput from event-loop / tool-call latency. + self.tps = meter.create_histogram( + name="agentex.llm.tps", + unit="tokens/s", + description="Output tokens per second over the generation window", + ) + self.input_tokens = meter.create_counter( + name="agentex.llm.input_tokens", + unit="tokens", + description="Total input tokens sent to the LLM", + ) + self.output_tokens = meter.create_counter( + name="agentex.llm.output_tokens", + unit="tokens", + description="Total output tokens returned by the LLM", + ) + self.cached_input_tokens = meter.create_counter( + name="agentex.llm.cached_input_tokens", + unit="tokens", + description="Subset of input tokens served from prompt cache", + ) + self.reasoning_tokens = meter.create_counter( + name="agentex.llm.reasoning_tokens", + unit="tokens", + description="Output tokens spent on reasoning (subset of output_tokens)", + ) + + +_llm_metrics: Optional[LLMMetrics] = None + + +def get_llm_metrics() -> LLMMetrics: + """Return the LLM metrics singleton, creating it on first use.""" + global _llm_metrics + if _llm_metrics is None: + _llm_metrics = LLMMetrics() + return _llm_metrics + + +def classify_status(exc: Optional[BaseException]) -> str: + """Categorize an LLM call's outcome into a small fixed set of status labels. + + A successful call returns ``"success"``. Exceptions are mapped by type name + so we don't depend on a specific provider SDK's exception class hierarchy: + OpenAI, Anthropic, and other providers all use names like ``RateLimitError``, + ``APITimeoutError``, ``InternalServerError``, etc. + """ + if exc is None: + return "success" + name = type(exc).__name__ + if "RateLimit" in name: + return "rate_limit" + if "Timeout" in name: + return "timeout" + if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")): + return "server_error" + if "Connection" in name: + return "network_error" + if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")): + return "client_error" + return "other_error" diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 84d45b1aa..5d887af7f 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -27,12 +27,12 @@ CodeInterpreterTool, ImageGenerationTool, ) -from opentelemetry import metrics from agents.computer import Computer, AsyncComputer # Re-export the canonical StreamingMode literal from the streaming service so # all layers share a single definition. from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode +from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics try: from agents.tool import ShellTool # type: ignore[attr-defined] @@ -80,61 +80,9 @@ logger = make_logger("agentex.temporal.streaming") -# OTel metrics for LLM streaming behavior. Instruments are created lazily on -# first use so the meter resolves to whatever MeterProvider the application -# eventually configures, even if that happens after this module is imported. -# All metrics carry only a ``model`` attribute to keep cardinality bounded; -# resource attributes (service.name, k8s.*, etc.) come from the application's -# OTel resource configuration. -class _StreamingMetrics: - """Lazily-created OTel instruments for streaming LLM telemetry.""" - - def __init__(self) -> None: - meter = metrics.get_meter("agentex.openai_agents.streaming") - self.ttft_ms = meter.create_histogram( - name="agentex.llm.ttft", - unit="ms", - description="Time from request submission to first content token (ms)", - ) - # Note: TPS denominator is the model-generation window - # (last_token_time - first_token_time), not total stream wall time. - # This isolates raw model throughput from event-loop / tool-call latency. - self.tps = meter.create_histogram( - name="agentex.llm.tps", - unit="tokens/s", - description="Output tokens per second over the generation window", - ) - self.input_tokens = meter.create_counter( - name="agentex.llm.input_tokens", - unit="tokens", - description="Total input tokens sent to the LLM", - ) - self.output_tokens = meter.create_counter( - name="agentex.llm.output_tokens", - unit="tokens", - description="Total output tokens returned by the LLM", - ) - self.cached_input_tokens = meter.create_counter( - name="agentex.llm.cached_input_tokens", - unit="tokens", - description="Subset of input tokens served from prompt cache", - ) - self.reasoning_tokens = meter.create_counter( - name="agentex.llm.reasoning_tokens", - unit="tokens", - description="Output tokens spent on reasoning (subset of output_tokens)", - ) - - -_streaming_metrics: Optional[_StreamingMetrics] = None - - -def _get_streaming_metrics() -> _StreamingMetrics: - """Return the streaming metrics singleton, creating it on first use.""" - global _streaming_metrics - if _streaming_metrics is None: - _streaming_metrics = _StreamingMetrics() - return _streaming_metrics +# LLM metrics live in agentex.lib.core.observability.llm_metrics so other +# code paths (sync ACP, Claude SDK plugin, future provider integrations) +# can share the same instrument definitions without redefining names. def _serialize_item(item: Any) -> dict[str, Any]: @@ -1070,8 +1018,9 @@ async def get_response( # no-op if the application hasn't configured a MeterProvider, so this # is safe to do unconditionally. We only emit ttft / tps when their # input data is actually meaningful (got a content delta, got tokens). - m = _get_streaming_metrics() + m = get_llm_metrics() metric_attrs = {"model": self.model_name} + m.requests.add(1, {**metric_attrs, "status": "success"}) m.input_tokens.add(usage.input_tokens or 0, metric_attrs) m.output_tokens.add(usage.output_tokens or 0, metric_attrs) m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs) @@ -1079,10 +1028,10 @@ async def get_response( if first_token_at is not None: m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) # tps denominator is the generation window (first→last delta), not - # total stream wall time — see _StreamingMetrics for rationale. - # Note: single-token responses (where first_token_at == last_token_at, - # e.g. a one-token tool-result acknowledgement) collapse the window - # to 0 and are intentionally skipped — TPS is undefined in that case. + # total stream wall time — see LLMMetrics for rationale. Single-token + # responses (where first_token_at == last_token_at, e.g. a one-token + # tool-result acknowledgement) collapse the window to 0 and are + # intentionally skipped — TPS is undefined in that case. if ( first_token_at is not None and last_token_at is not None @@ -1107,6 +1056,12 @@ async def get_response( except Exception as e: logger.error(f"Error using Responses API: {e}") + # Emit a request-counter event so 429s, 5xxs, timeouts, etc. are + # observable on the SDK side. Status histograms / token counters + # only fire on successful completion above. + get_llm_metrics().requests.add( + 1, {"model": self.model_name, "status": classify_status(e)} + ) raise # The _get_response_with_responses_api method has been merged into get_response above From b08e48f4bfae6aac6500c0353ce0c0bbd85ff166 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Fri, 8 May 2026 00:24:23 -0400 Subject: [PATCH 5/9] feat(streaming): add ttat (time-to-first-answering-token) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ttft fires on the first content delta of any kind, which for reasoning models means the first reasoning chunk — arrives quickly even when the user-perceived latency is much longer. ttat fires only on the first user-visible answer token (text delta or tool-call arguments delta), excluding reasoning chunks. For non-reasoning models the two are equal; for gpt-5-class / o-series models they differ by the reasoning duration. This pairs with ttft for "did the model start thinking quickly?" vs "how long did the user wait for an answer?" — both are valuable signals that mean different things on reasoning workloads. Implementation: a third bookmark variable (``first_answer_at``) set inside the same up-front event-type check, restricted to ResponseTextDeltaEvent / ResponseFunctionCallArgumentsDeltaEvent. Adds one new histogram (``agentex.llm.ttat``) — same labels and units as ttft. --- .../lib/core/observability/llm_metrics.py | 9 +++++++++ .../models/temporal_streaming_model.py | 18 ++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/agentex/lib/core/observability/llm_metrics.py b/src/agentex/lib/core/observability/llm_metrics.py index a8a9d0041..b15e83824 100644 --- a/src/agentex/lib/core/observability/llm_metrics.py +++ b/src/agentex/lib/core/observability/llm_metrics.py @@ -47,6 +47,15 @@ def __init__(self) -> None: unit="ms", description="Time from request submission to first content token (ms)", ) + # ttat (time-to-first-answering-token) is distinct from ttft for reasoning + # models: ttft fires on the first reasoning chunk (which arrives quickly), + # while ttat fires on the first user-visible answer token (text or tool + # call). For non-reasoning models the two are equal. + self.ttat_ms = meter.create_histogram( + name="agentex.llm.ttat", + unit="ms", + description="Time from request submission to first answering token (text or tool-call delta) — excludes reasoning chunks", + ) # Note: TPS denominator is the model-generation window # (last_token_time - first_token_time), not total stream wall time. # This isolates raw model throughput from event-loop / tool-call latency. diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 5d887af7f..537b68f61 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -653,12 +653,16 @@ async def get_response( reasoning_summaries = [] reasoning_contents = [] event_count = 0 - # ttft / tps instrumentation. ``stream_start_perf`` is set above, - # before the responses.create() await, so it captures the full + # ttft / ttat / tps instrumentation. ``stream_start_perf`` is set + # above, before the responses.create() await, so it captures the full # request-to-first-token latency. ``first_token_at`` and # ``last_token_at`` bracket the model-generation window for tps. + # ``first_answer_at`` is set on the first user-visible answer token + # (text or tool-call delta) and excludes reasoning chunks, so ttat + # measures the latency users actually perceive on reasoning models. first_token_at: Optional[float] = None last_token_at: Optional[float] = None + first_answer_at: Optional[float] = None # We expect task_id to always be provided for streaming if not task_id: @@ -686,6 +690,14 @@ async def get_response( if first_token_at is None: first_token_at = now_perf last_token_at = now_perf + # ttat: first user-visible answer token (text or tool call), + # excluding reasoning chunks. Equal to ttft for non-reasoning + # models; differs by reasoning duration for reasoning models. + if first_answer_at is None and isinstance(event, ( + ResponseTextDeltaEvent, + ResponseFunctionCallArgumentsDeltaEvent, + )): + first_answer_at = now_perf # Handle different event types using isinstance for type safety if isinstance(event, ResponseOutputItemAddedEvent): @@ -1027,6 +1039,8 @@ async def get_response( m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs) if first_token_at is not None: m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) + if first_answer_at is not None: + m.ttat_ms.record((first_answer_at - stream_start_perf) * 1000, metric_attrs) # tps denominator is the generation window (first→last delta), not # total stream wall time — see LLMMetrics for rationale. Single-token # responses (where first_token_at == last_token_at, e.g. a one-token From 1935aa97e08000bff0bd7d9d4f3d4e7b8e4f5eda Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Fri, 8 May 2026 00:27:49 -0400 Subject: [PATCH 6/9] review (greptile): swallow metric-emission errors in except handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If get_llm_metrics().requests.add() raises (misbehaving exporter, OTel SDK bug, network blip mid-export), the original LLM exception would be shadowed by the metric error. Callers — retry logic, circuit breakers, the OpenAI Agents Temporal plugin's retryable/non-retryable classifier — inspect the typed exception (RateLimitError, APITimeoutError, etc.) and would silently break with an unexpected OTel exception in its place. Wrap the .add() call in a bare try/except so the metric is best-effort and the typed LLM exception always propagates. --- .../models/temporal_streaming_model.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 537b68f61..910be38d7 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -1072,10 +1072,16 @@ async def get_response( logger.error(f"Error using Responses API: {e}") # Emit a request-counter event so 429s, 5xxs, timeouts, etc. are # observable on the SDK side. Status histograms / token counters - # only fire on successful completion above. - get_llm_metrics().requests.add( - 1, {"model": self.model_name, "status": classify_status(e)} - ) + # only fire on successful completion above. Wrapped in a bare + # try/except so a misbehaving exporter can't shadow the original + # LLM exception — callers (retry logic, circuit breakers) need + # to see the typed RateLimitError / APITimeoutError / etc. + try: + get_llm_metrics().requests.add( + 1, {"model": self.model_name, "status": classify_status(e)} + ) + except Exception: + pass raise # The _get_response_with_responses_api method has been merged into get_response above From 65d2e81c4f625f734ea6bcff374da5ce69fa8f32 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Fri, 8 May 2026 01:34:51 -0400 Subject: [PATCH 7/9] refactor: emit LLM token / request metrics via RunHooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits metric emission by what each layer can see: - agentex.lib.core.observability.llm_metrics_hooks.LLMMetricsHooks (RunHooks subclass) emits agentex.llm.requests + the four token counters in on_llm_end. Works for any RunHooks-aware path. - TemporalStreamingHooks now inherits from LLMMetricsHooks so the async path picks up the same metrics automatically. - TemporalStreamingModel keeps only the streaming-only metrics (ttft, ttat, tps) — those need per-chunk visibility hooks can't provide. Failure path uses the new record_llm_failure helper. This makes adding the sync ACP path trivial later: pass LLMMetricsHooks() to Runner.run from services/adk/providers/openai.py and it'll emit the same metrics with no double-counting. Tests cover: - classify_status branches (rate_limit / timeout / server_error / network_error / client_error / other_error / success) - get_llm_metrics singleton + instrument presence - LLMMetricsHooks.on_llm_end emits requests + token counters with the right model attribute - Both the hooks path and record_llm_failure swallow exporter exceptions so callers don't break when metrics fail --- .../core/observability/llm_metrics_hooks.py | 46 ++++++ .../lib/core/observability/tests/__init__.py | 0 .../observability/tests/test_llm_metrics.py | 83 +++++++++++ .../tests/test_llm_metrics_hooks.py | 135 ++++++++++++++++++ .../plugins/openai_agents/hooks/hooks.py | 5 +- .../models/temporal_streaming_model.py | 41 ++---- 6 files changed, 279 insertions(+), 31 deletions(-) create mode 100644 src/agentex/lib/core/observability/llm_metrics_hooks.py create mode 100644 src/agentex/lib/core/observability/tests/__init__.py create mode 100644 src/agentex/lib/core/observability/tests/test_llm_metrics.py create mode 100644 src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py diff --git a/src/agentex/lib/core/observability/llm_metrics_hooks.py b/src/agentex/lib/core/observability/llm_metrics_hooks.py new file mode 100644 index 000000000..c874ec4e7 --- /dev/null +++ b/src/agentex/lib/core/observability/llm_metrics_hooks.py @@ -0,0 +1,46 @@ +"""``RunHooks`` adapter that emits per-call LLM metrics. + +Used by the sync ACP path and as a base class for ``TemporalStreamingHooks`` +on the async path, so token / request / cache metrics emit consistently +across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the +streaming model itself, not here — hooks don't see individual chunks. +""" + +from __future__ import annotations + +from typing import Any + +from agents import Agent, RunHooks, ModelResponse, RunContextWrapper + +from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics + + +class LLMMetricsHooks(RunHooks): + """Emits ``agentex.llm.requests`` + token counters on every LLM call.""" + + async def on_llm_end( + self, + context: RunContextWrapper[Any], + agent: Agent[Any], + response: ModelResponse, + ) -> None: + del context # part of the RunHooks contract; unused here + m = get_llm_metrics() + attrs = {"model": str(agent.model) if agent.model else "unknown"} + try: + usage = response.usage + m.requests.add(1, {**attrs, "status": "success"}) + m.input_tokens.add(usage.input_tokens or 0, attrs) + m.output_tokens.add(usage.output_tokens or 0, attrs) + m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs) + m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs) + except Exception: + pass + + +def record_llm_failure(model: str, exc: BaseException) -> None: + """Best-effort counter bump for an LLM call that raised before ``on_llm_end``.""" + try: + get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)}) + except Exception: + pass diff --git a/src/agentex/lib/core/observability/tests/__init__.py b/src/agentex/lib/core/observability/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/agentex/lib/core/observability/tests/test_llm_metrics.py b/src/agentex/lib/core/observability/tests/test_llm_metrics.py new file mode 100644 index 000000000..d8ab62eba --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_llm_metrics.py @@ -0,0 +1,83 @@ +"""Tests for ``agentex.lib.core.observability.llm_metrics``.""" + +from __future__ import annotations + +import agentex.lib.core.observability.llm_metrics as llm_metrics +from agentex.lib.core.observability.llm_metrics import ( + LLMMetrics, + classify_status, + get_llm_metrics, +) + + +class TestClassifyStatus: + def test_none_is_success(self): + assert classify_status(None) == "success" + + def test_rate_limit(self): + class RateLimitError(Exception): + pass + + assert classify_status(RateLimitError()) == "rate_limit" + + def test_timeout(self): + class APITimeoutError(Exception): + pass + + assert classify_status(APITimeoutError()) == "timeout" + + def test_server_error(self): + class InternalServerError(Exception): + pass + + assert classify_status(InternalServerError()) == "server_error" + + class ServiceUnavailable(Exception): + pass + + assert classify_status(ServiceUnavailable()) == "server_error" + + def test_network_error(self): + class APIConnectionError(Exception): + pass + + assert classify_status(APIConnectionError()) == "network_error" + + def test_client_error(self): + for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"): + cls = type(cls_name, (Exception,), {}) + assert classify_status(cls()) == "client_error" + + def test_unknown_falls_back(self): + class WeirdProviderException(Exception): + pass + + assert classify_status(WeirdProviderException()) == "other_error" + + +class TestGetLLMMetrics: + def test_returns_llm_metrics_instance(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + m = get_llm_metrics() + assert isinstance(m, LLMMetrics) + + def test_singleton_returns_same_instance(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + first = get_llm_metrics() + second = get_llm_metrics() + assert first is second + + def test_instruments_exist(self, monkeypatch): + monkeypatch.setattr(llm_metrics, "_llm_metrics", None) + m = get_llm_metrics() + for name in ( + "requests", + "ttft_ms", + "ttat_ms", + "tps", + "input_tokens", + "output_tokens", + "cached_input_tokens", + "reasoning_tokens", + ): + assert hasattr(m, name), f"missing instrument: {name}" diff --git a/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py new file mode 100644 index 000000000..857b6f249 --- /dev/null +++ b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py @@ -0,0 +1,135 @@ +"""Tests for ``agentex.lib.core.observability.llm_metrics_hooks``.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +import agentex.lib.core.observability.llm_metrics_hooks as hooks_module +from agentex.lib.core.observability.llm_metrics_hooks import ( + LLMMetricsHooks, + record_llm_failure, +) + + +def _mock_response( + *, + input_tokens: int = 100, + output_tokens: int = 50, + cached_tokens: int = 30, + reasoning_tokens: int = 10, +) -> MagicMock: + response = MagicMock() + response.usage.input_tokens = input_tokens + response.usage.output_tokens = output_tokens + response.usage.input_tokens_details.cached_tokens = cached_tokens + response.usage.output_tokens_details.reasoning_tokens = reasoning_tokens + return response + + +def _mock_agent(model: str = "gpt-5") -> MagicMock: + agent = MagicMock() + agent.model = model + return agent + + +class TestLLMMetricsHooksOnLLMEnd: + @pytest.mark.asyncio + async def test_emits_success_request_counter(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent("gpt-5"), + response=_mock_response(), + ) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + + @pytest.mark.asyncio + async def test_emits_token_counters(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent("gpt-5"), + response=_mock_response( + input_tokens=200, + output_tokens=75, + cached_tokens=50, + reasoning_tokens=20, + ), + ) + + attrs = {"model": "gpt-5"} + m.input_tokens.add.assert_called_once_with(200, attrs) + m.output_tokens.add.assert_called_once_with(75, attrs) + m.cached_input_tokens.add.assert_called_once_with(50, attrs) + m.reasoning_tokens.add.assert_called_once_with(20, attrs) + + @pytest.mark.asyncio + async def test_zero_tokens_emit_zero_not_skip(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_mock_response(input_tokens=0, output_tokens=0, cached_tokens=0, reasoning_tokens=0), + ) + + m.input_tokens.add.assert_called_once_with(0, {"model": "gpt-5"}) + m.output_tokens.add.assert_called_once_with(0, {"model": "gpt-5"}) + + @pytest.mark.asyncio + async def test_unknown_model_falls_back(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + agent = MagicMock() + agent.model = None + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=agent, + response=_mock_response(), + ) + + m.requests.add.assert_called_once_with(1, {"model": "unknown", "status": "success"}) + + @pytest.mark.asyncio + async def test_swallows_exporter_failure(self, monkeypatch): + m = MagicMock() + m.requests.add.side_effect = RuntimeError("exporter exploded") + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + # Should not raise — caller's flow must not break on metric failure. + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_mock_response(), + ) + + +class TestRecordLLMFailure: + def test_emits_classified_status(self, monkeypatch): + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class RateLimitError(Exception): + pass + + record_llm_failure("gpt-5", RateLimitError()) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "rate_limit"}) + + def test_swallows_exporter_failure(self, monkeypatch): + m = MagicMock() + m.requests.add.side_effect = RuntimeError("exporter exploded") + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + # Should not raise. + record_llm_failure("gpt-5", Exception("upstream")) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py index cc27006fc..758b0db27 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/hooks/hooks.py @@ -8,18 +8,19 @@ from typing import Any, override from datetime import timedelta -from agents import Tool, Agent, RunHooks, RunContextWrapper +from agents import Tool, Agent, RunContextWrapper from temporalio import workflow from agents.tool_context import ToolContext from agentex.types.text_content import TextContent from agentex.types.task_message_content import ToolRequestContent, ToolResponseContent +from agentex.lib.core.observability.llm_metrics_hooks import LLMMetricsHooks from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content logger = logging.getLogger(__name__) -class TemporalStreamingHooks(RunHooks): +class TemporalStreamingHooks(LLMMetricsHooks): """Convenience hooks class for streaming OpenAI Agent lifecycle events to the AgentEx UI. This class automatically streams agent lifecycle events (tool calls, handoffs) to the diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 910be38d7..7ccc6627a 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -32,7 +32,8 @@ # Re-export the canonical StreamingMode literal from the streaming service so # all layers share a single definition. from agentex.lib.core.services.adk.streaming import StreamingMode as StreamingMode -from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics +from agentex.lib.core.observability.llm_metrics import get_llm_metrics +from agentex.lib.core.observability.llm_metrics_hooks import record_llm_failure try: from agents.tool import ShellTool # type: ignore[attr-defined] @@ -1026,34 +1027,24 @@ async def get_response( span.output = output_data - # Emit LLM metrics derived from the captured stream. The meter is a - # no-op if the application hasn't configured a MeterProvider, so this - # is safe to do unconditionally. We only emit ttft / tps when their - # input data is actually meaningful (got a content delta, got tokens). + # Streaming-only metrics. Token counters and the success request + # counter are emitted by LLMMetricsHooks.on_llm_end so they fire + # consistently across streaming and non-streaming paths. m = get_llm_metrics() metric_attrs = {"model": self.model_name} - m.requests.add(1, {**metric_attrs, "status": "success"}) - m.input_tokens.add(usage.input_tokens or 0, metric_attrs) - m.output_tokens.add(usage.output_tokens or 0, metric_attrs) - m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, metric_attrs) - m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, metric_attrs) if first_token_at is not None: m.ttft_ms.record((first_token_at - stream_start_perf) * 1000, metric_attrs) if first_answer_at is not None: m.ttat_ms.record((first_answer_at - stream_start_perf) * 1000, metric_attrs) - # tps denominator is the generation window (first→last delta), not - # total stream wall time — see LLMMetrics for rationale. Single-token - # responses (where first_token_at == last_token_at, e.g. a one-token - # tool-result acknowledgement) collapse the window to 0 and are - # intentionally skipped — TPS is undefined in that case. + # Single-token responses collapse the generation window to 0; tps + # is undefined and skipped. if ( first_token_at is not None and last_token_at is not None and last_token_at > first_token_at and (usage.output_tokens or 0) > 0 ): - generation_window_s = last_token_at - first_token_at - m.tps.record(usage.output_tokens / generation_window_s, metric_attrs) + m.tps.record(usage.output_tokens / (last_token_at - first_token_at), metric_attrs) # Return the response. response_id is the server-issued id from # ResponseCompletedEvent.response.id, or None when the stream ended @@ -1070,18 +1061,10 @@ async def get_response( except Exception as e: logger.error(f"Error using Responses API: {e}") - # Emit a request-counter event so 429s, 5xxs, timeouts, etc. are - # observable on the SDK side. Status histograms / token counters - # only fire on successful completion above. Wrapped in a bare - # try/except so a misbehaving exporter can't shadow the original - # LLM exception — callers (retry logic, circuit breakers) need - # to see the typed RateLimitError / APITimeoutError / etc. - try: - get_llm_metrics().requests.add( - 1, {"model": self.model_name, "status": classify_status(e)} - ) - except Exception: - pass + # LLMMetricsHooks.on_llm_end doesn't fire on error, so emit the + # failure counter here. Best-effort so the typed LLM exception + # always propagates intact for retry / circuit-breaker logic. + record_llm_failure(self.model_name, e) raise # The _get_response_with_responses_api method has been merged into get_response above From 229a3f5f96fb3e02366a34b29baa803e0fbbb287 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Fri, 8 May 2026 01:41:15 -0400 Subject: [PATCH 8/9] review: harden against malformed Usage shapes (litellm / non-OpenAI providers) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a provider returns a ModelResponse with a Usage shape the OpenAI Agents SDK didn't fully normalize — missing input_tokens_details, missing usage entirely, None token values — we want to record what we can and skip the rest, never crash the caller. - Move requests.add outside the usage-extraction try block so the success counter still fires when usage access raises (e.g., None). - Add three tests covering: response with raising .usage property, Usage missing input_tokens_details, and Usage with all-None token values. --- .../core/observability/llm_metrics_hooks.py | 11 ++- .../tests/test_llm_metrics_hooks.py | 80 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/src/agentex/lib/core/observability/llm_metrics_hooks.py b/src/agentex/lib/core/observability/llm_metrics_hooks.py index c874ec4e7..64c5284ce 100644 --- a/src/agentex/lib/core/observability/llm_metrics_hooks.py +++ b/src/agentex/lib/core/observability/llm_metrics_hooks.py @@ -27,9 +27,18 @@ async def on_llm_end( del context # part of the RunHooks contract; unused here m = get_llm_metrics() attrs = {"model": str(agent.model) if agent.model else "unknown"} + # Request counter only depends on agent.model, so emit it first and + # outside the usage-extraction try block. Token counters reach into + # nested optional fields and are best-effort: a non-OpenAI provider + # (litellm-routed Anthropic, etc.) may return a Usage shape missing + # input_tokens_details / output_tokens_details — we emit zeros where + # we can and skip the rest rather than crash the caller. try: - usage = response.usage m.requests.add(1, {**attrs, "status": "success"}) + except Exception: + pass + try: + usage = response.usage m.input_tokens.add(usage.input_tokens or 0, attrs) m.output_tokens.add(usage.output_tokens or 0, attrs) m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs) diff --git a/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py index 857b6f249..a2cef95b8 100644 --- a/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py +++ b/src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py @@ -113,6 +113,86 @@ async def test_swallows_exporter_failure(self, monkeypatch): response=_mock_response(), ) + @pytest.mark.asyncio + async def test_missing_usage_still_emits_request_counter(self, monkeypatch): + """Provider returns a response without `usage` — caller shouldn't crash, + and we should still record the success request counter.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class _Response: + @property + def usage(self): + raise AttributeError("no usage") + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_Response(), # type: ignore[arg-type] + ) + + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + m.input_tokens.add.assert_not_called() + m.output_tokens.add.assert_not_called() + + @pytest.mark.asyncio + async def test_missing_token_details_skips_those_counters(self, monkeypatch): + """Provider returns Usage without input_tokens_details (e.g. some + litellm wrappers / non-OpenAI providers): top-level token counts + still emit; the nested cached/reasoning counters are skipped.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + class _Usage: + input_tokens = 100 + output_tokens = 50 + + @property + def input_tokens_details(self): + raise AttributeError("no details") + + class _Response: + usage = _Usage() + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=_Response(), # type: ignore[arg-type] + ) + + # Request counter still fires (it's outside the usage-extraction try). + m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"}) + # input_tokens.add fires before the nested attribute access. + m.input_tokens.add.assert_called_once_with(100, {"model": "gpt-5"}) + # cached_input_tokens / reasoning_tokens skipped — the AttributeError + # bailed before they could be called. + m.cached_input_tokens.add.assert_not_called() + m.reasoning_tokens.add.assert_not_called() + + @pytest.mark.asyncio + async def test_none_token_values_emit_as_zero(self, monkeypatch): + """Some providers report None instead of 0 for fields they don't track.""" + m = MagicMock() + monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m) + + response = MagicMock() + response.usage.input_tokens = None + response.usage.output_tokens = None + response.usage.input_tokens_details.cached_tokens = None + response.usage.output_tokens_details.reasoning_tokens = None + + await LLMMetricsHooks().on_llm_end( + context=MagicMock(), + agent=_mock_agent(), + response=response, + ) + + attrs = {"model": "gpt-5"} + m.input_tokens.add.assert_called_once_with(0, attrs) + m.output_tokens.add.assert_called_once_with(0, attrs) + m.cached_input_tokens.add.assert_called_once_with(0, attrs) + m.reasoning_tokens.add.assert_called_once_with(0, attrs) + class TestRecordLLMFailure: def test_emits_classified_status(self, monkeypatch): From 56cd1f7ba0a76bd3652cb83f31967ba230547ef3 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Fri, 8 May 2026 02:48:21 -0400 Subject: [PATCH 9/9] fix: add @override on LLMMetricsHooks.on_llm_end for pyright --- src/agentex/lib/core/observability/llm_metrics_hooks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/agentex/lib/core/observability/llm_metrics_hooks.py b/src/agentex/lib/core/observability/llm_metrics_hooks.py index 64c5284ce..fce4b29ba 100644 --- a/src/agentex/lib/core/observability/llm_metrics_hooks.py +++ b/src/agentex/lib/core/observability/llm_metrics_hooks.py @@ -9,6 +9,7 @@ from __future__ import annotations from typing import Any +from typing_extensions import override from agents import Agent, RunHooks, ModelResponse, RunContextWrapper @@ -18,6 +19,7 @@ class LLMMetricsHooks(RunHooks): """Emits ``agentex.llm.requests`` + token counters on every LLM call.""" + @override async def on_llm_end( self, context: RunContextWrapper[Any],