Skip to content
Empty file.
121 changes: 121 additions & 0 deletions src/agentex/lib/core/observability/llm_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""OTel metrics for LLM calls.

Single source of truth for LLM-call instrumentation across all agentex code
paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK
plugin in future PRs. Centralizing the instrument definitions here means
those follow-ups don't need to redefine the metric names, units, or
description strings; they import ``get_llm_metrics()`` and record values.

The meter is no-op when the application hasn't configured a ``MeterProvider``,
so importing this module is safe for runtimes that don't use OTel. Instruments
are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider``
configured *after* this module is imported still binds correctly.

Cardinality is bounded:
- All metrics carry only ``model`` (the LLM model name).
- ``requests`` additionally carries ``status``, drawn from a small fixed set
(see ``classify_status``).

Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
application's OTel resource configuration and are added to every series
automatically.
"""

from __future__ import annotations

from typing import Optional

from opentelemetry import metrics


class LLMMetrics:
"""Lazily-created OTel instruments for LLM call telemetry."""

def __init__(self) -> None:
meter = metrics.get_meter("agentex.llm")
self.requests = meter.create_counter(
name="agentex.llm.requests",
unit="1",
description=(
"LLM call count tagged with status (success / rate_limit / "
"server_error / client_error / timeout / network_error / "
"other_error). Use to alert on 429s, 5xxs, etc."
),
)
self.ttft_ms = meter.create_histogram(
name="agentex.llm.ttft",
unit="ms",
description="Time from request submission to first content token (ms)",
)
# ttat (time-to-first-answering-token) is distinct from ttft for reasoning
# models: ttft fires on the first reasoning chunk (which arrives quickly),
# while ttat fires on the first user-visible answer token (text or tool
# call). For non-reasoning models the two are equal.
self.ttat_ms = meter.create_histogram(
name="agentex.llm.ttat",
unit="ms",
description="Time from request submission to first answering token (text or tool-call delta) — excludes reasoning chunks",
)
# Note: TPS denominator is the model-generation window
# (last_token_time - first_token_time), not total stream wall time.
# This isolates raw model throughput from event-loop / tool-call latency.
self.tps = meter.create_histogram(
name="agentex.llm.tps",
unit="tokens/s",
description="Output tokens per second over the generation window",
)
self.input_tokens = meter.create_counter(
name="agentex.llm.input_tokens",
unit="tokens",
description="Total input tokens sent to the LLM",
)
self.output_tokens = meter.create_counter(
name="agentex.llm.output_tokens",
unit="tokens",
description="Total output tokens returned by the LLM",
)
self.cached_input_tokens = meter.create_counter(
name="agentex.llm.cached_input_tokens",
unit="tokens",
description="Subset of input tokens served from prompt cache",
)
self.reasoning_tokens = meter.create_counter(
name="agentex.llm.reasoning_tokens",
unit="tokens",
description="Output tokens spent on reasoning (subset of output_tokens)",
)


_llm_metrics: Optional[LLMMetrics] = None


def get_llm_metrics() -> LLMMetrics:
"""Return the LLM metrics singleton, creating it on first use."""
global _llm_metrics
if _llm_metrics is None:
_llm_metrics = LLMMetrics()
return _llm_metrics


def classify_status(exc: Optional[BaseException]) -> str:
"""Categorize an LLM call's outcome into a small fixed set of status labels.

A successful call returns ``"success"``. Exceptions are mapped by type name
so we don't depend on a specific provider SDK's exception class hierarchy:
OpenAI, Anthropic, and other providers all use names like ``RateLimitError``,
``APITimeoutError``, ``InternalServerError``, etc.
"""
if exc is None:
return "success"
name = type(exc).__name__
if "RateLimit" in name:
return "rate_limit"
if "Timeout" in name:
return "timeout"
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
return "server_error"
if "Connection" in name:
return "network_error"
if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")):
return "client_error"
return "other_error"
57 changes: 57 additions & 0 deletions src/agentex/lib/core/observability/llm_metrics_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""``RunHooks`` adapter that emits per-call LLM metrics.

Used by the sync ACP path and as a base class for ``TemporalStreamingHooks``
on the async path, so token / request / cache metrics emit consistently
across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the
streaming model itself, not here — hooks don't see individual chunks.
"""

from __future__ import annotations

from typing import Any
from typing_extensions import override

from agents import Agent, RunHooks, ModelResponse, RunContextWrapper

from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics


class LLMMetricsHooks(RunHooks):
"""Emits ``agentex.llm.requests`` + token counters on every LLM call."""

@override
async def on_llm_end(
self,
context: RunContextWrapper[Any],
agent: Agent[Any],
response: ModelResponse,
) -> None:
del context # part of the RunHooks contract; unused here
m = get_llm_metrics()
Comment thread
greptile-apps[bot] marked this conversation as resolved.
attrs = {"model": str(agent.model) if agent.model else "unknown"}
# Request counter only depends on agent.model, so emit it first and
# outside the usage-extraction try block. Token counters reach into
# nested optional fields and are best-effort: a non-OpenAI provider
# (litellm-routed Anthropic, etc.) may return a Usage shape missing
# input_tokens_details / output_tokens_details — we emit zeros where
# we can and skip the rest rather than crash the caller.
try:
m.requests.add(1, {**attrs, "status": "success"})
except Exception:
pass
try:
usage = response.usage
m.input_tokens.add(usage.input_tokens or 0, attrs)
m.output_tokens.add(usage.output_tokens or 0, attrs)
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs)
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
except Exception:
pass


def record_llm_failure(model: str, exc: BaseException) -> None:
"""Best-effort counter bump for an LLM call that raised before ``on_llm_end``."""
try:
get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)})
except Exception:
pass
Empty file.
83 changes: 83 additions & 0 deletions src/agentex/lib/core/observability/tests/test_llm_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Tests for ``agentex.lib.core.observability.llm_metrics``."""

from __future__ import annotations

import agentex.lib.core.observability.llm_metrics as llm_metrics
from agentex.lib.core.observability.llm_metrics import (
LLMMetrics,
classify_status,
get_llm_metrics,
)


class TestClassifyStatus:
def test_none_is_success(self):
assert classify_status(None) == "success"

def test_rate_limit(self):
class RateLimitError(Exception):
pass

assert classify_status(RateLimitError()) == "rate_limit"

def test_timeout(self):
class APITimeoutError(Exception):
pass

assert classify_status(APITimeoutError()) == "timeout"

def test_server_error(self):
class InternalServerError(Exception):
pass

assert classify_status(InternalServerError()) == "server_error"

class ServiceUnavailable(Exception):
pass

assert classify_status(ServiceUnavailable()) == "server_error"

def test_network_error(self):
class APIConnectionError(Exception):
pass

assert classify_status(APIConnectionError()) == "network_error"

def test_client_error(self):
for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"):
cls = type(cls_name, (Exception,), {})
assert classify_status(cls()) == "client_error"

def test_unknown_falls_back(self):
class WeirdProviderException(Exception):
pass

assert classify_status(WeirdProviderException()) == "other_error"


class TestGetLLMMetrics:
def test_returns_llm_metrics_instance(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
m = get_llm_metrics()
assert isinstance(m, LLMMetrics)

def test_singleton_returns_same_instance(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
first = get_llm_metrics()
second = get_llm_metrics()
assert first is second

def test_instruments_exist(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
m = get_llm_metrics()
for name in (
"requests",
"ttft_ms",
"ttat_ms",
"tps",
"input_tokens",
"output_tokens",
"cached_input_tokens",
"reasoning_tokens",
):
assert hasattr(m, name), f"missing instrument: {name}"
Loading
Loading