Skip to content
Empty file.
121 changes: 121 additions & 0 deletions src/agentex/lib/core/observability/llm_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""OTel metrics for LLM calls.

Single source of truth for LLM-call instrumentation across all agentex code
paths — temporal+openai_agents streaming today, sync ACP and the Claude SDK
plugin in future PRs. Centralizing the instrument definitions here means
those follow-ups don't need to redefine the metric names, units, or
description strings; they import ``get_llm_metrics()`` and record values.

The meter is no-op when the application hasn't configured a ``MeterProvider``,
so importing this module is safe for runtimes that don't use OTel. Instruments
are created lazily on first ``get_llm_metrics()`` call so a ``MeterProvider``
configured *after* this module is imported still binds correctly.

Cardinality is bounded:
- All metrics carry only ``model`` (the LLM model name).
- ``requests`` additionally carries ``status``, drawn from a small fixed set
(see ``classify_status``).

Resource attributes (``service.name``, ``k8s.*``, etc.) come from the
application's OTel resource configuration and are added to every series
automatically.
"""

from __future__ import annotations

from typing import Optional

from opentelemetry import metrics


class LLMMetrics:
"""Lazily-created OTel instruments for LLM call telemetry."""

def __init__(self) -> None:
meter = metrics.get_meter("agentex.llm")
self.requests = meter.create_counter(
name="agentex.llm.requests",
unit="1",
description=(
"LLM call count tagged with status (success / rate_limit / "
"server_error / client_error / timeout / network_error / "
"other_error). Use to alert on 429s, 5xxs, etc."
),
)
self.ttft_ms = meter.create_histogram(
name="agentex.llm.ttft",
unit="ms",
description="Time from request submission to first content token (ms)",
)
# ttat (time-to-first-answering-token) is distinct from ttft for reasoning
# models: ttft fires on the first reasoning chunk (which arrives quickly),
# while ttat fires on the first user-visible answer token (text or tool
# call). For non-reasoning models the two are equal.
self.ttat_ms = meter.create_histogram(
name="agentex.llm.ttat",
unit="ms",
description="Time from request submission to first answering token (text or tool-call delta) — excludes reasoning chunks",
)
# Note: TPS denominator is the model-generation window
# (last_token_time - first_token_time), not total stream wall time.
# This isolates raw model throughput from event-loop / tool-call latency.
self.tps = meter.create_histogram(
name="agentex.llm.tps",
unit="tokens/s",
description="Output tokens per second over the generation window",
)
self.input_tokens = meter.create_counter(
name="agentex.llm.input_tokens",
unit="tokens",
description="Total input tokens sent to the LLM",
)
self.output_tokens = meter.create_counter(
name="agentex.llm.output_tokens",
unit="tokens",
description="Total output tokens returned by the LLM",
)
self.cached_input_tokens = meter.create_counter(
name="agentex.llm.cached_input_tokens",
unit="tokens",
description="Subset of input tokens served from prompt cache",
)
self.reasoning_tokens = meter.create_counter(
name="agentex.llm.reasoning_tokens",
unit="tokens",
description="Output tokens spent on reasoning (subset of output_tokens)",
)


_llm_metrics: Optional[LLMMetrics] = None


def get_llm_metrics() -> LLMMetrics:
"""Return the LLM metrics singleton, creating it on first use."""
global _llm_metrics
if _llm_metrics is None:
_llm_metrics = LLMMetrics()
return _llm_metrics


def classify_status(exc: Optional[BaseException]) -> str:
"""Categorize an LLM call's outcome into a small fixed set of status labels.

A successful call returns ``"success"``. Exceptions are mapped by type name
so we don't depend on a specific provider SDK's exception class hierarchy:
OpenAI, Anthropic, and other providers all use names like ``RateLimitError``,
``APITimeoutError``, ``InternalServerError``, etc.
"""
if exc is None:
return "success"
name = type(exc).__name__
if "RateLimit" in name:
return "rate_limit"
if "Timeout" in name:
return "timeout"
if any(s in name for s in ("ServerError", "InternalServer", "ServiceUnavailable", "BadGateway")):
return "server_error"
if "Connection" in name:
return "network_error"
if any(s in name for s in ("BadRequest", "Authentication", "Permission", "NotFound", "Conflict", "UnprocessableEntity")):
return "client_error"
return "other_error"
46 changes: 46 additions & 0 deletions src/agentex/lib/core/observability/llm_metrics_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""``RunHooks`` adapter that emits per-call LLM metrics.

Used by the sync ACP path and as a base class for ``TemporalStreamingHooks``
on the async path, so token / request / cache metrics emit consistently
across both. Streaming-only metrics (ttft, ttat, tps) are emitted from the
streaming model itself, not here — hooks don't see individual chunks.
"""

from __future__ import annotations

from typing import Any

from agents import Agent, RunHooks, ModelResponse, RunContextWrapper

from agentex.lib.core.observability.llm_metrics import classify_status, get_llm_metrics


class LLMMetricsHooks(RunHooks):
"""Emits ``agentex.llm.requests`` + token counters on every LLM call."""

async def on_llm_end(
self,
context: RunContextWrapper[Any],
agent: Agent[Any],
response: ModelResponse,
) -> None:
del context # part of the RunHooks contract; unused here
m = get_llm_metrics()
Comment thread
greptile-apps[bot] marked this conversation as resolved.
attrs = {"model": str(agent.model) if agent.model else "unknown"}
try:
usage = response.usage
m.requests.add(1, {**attrs, "status": "success"})
m.input_tokens.add(usage.input_tokens or 0, attrs)
m.output_tokens.add(usage.output_tokens or 0, attrs)
m.cached_input_tokens.add(usage.input_tokens_details.cached_tokens or 0, attrs)
m.reasoning_tokens.add(usage.output_tokens_details.reasoning_tokens or 0, attrs)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
except Exception:
pass


def record_llm_failure(model: str, exc: BaseException) -> None:
"""Best-effort counter bump for an LLM call that raised before ``on_llm_end``."""
try:
get_llm_metrics().requests.add(1, {"model": model, "status": classify_status(exc)})
except Exception:
pass
Empty file.
83 changes: 83 additions & 0 deletions src/agentex/lib/core/observability/tests/test_llm_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Tests for ``agentex.lib.core.observability.llm_metrics``."""

from __future__ import annotations

import agentex.lib.core.observability.llm_metrics as llm_metrics
from agentex.lib.core.observability.llm_metrics import (
LLMMetrics,
classify_status,
get_llm_metrics,
)


class TestClassifyStatus:
def test_none_is_success(self):
assert classify_status(None) == "success"

def test_rate_limit(self):
class RateLimitError(Exception):
pass

assert classify_status(RateLimitError()) == "rate_limit"

def test_timeout(self):
class APITimeoutError(Exception):
pass

assert classify_status(APITimeoutError()) == "timeout"

def test_server_error(self):
class InternalServerError(Exception):
pass

assert classify_status(InternalServerError()) == "server_error"

class ServiceUnavailable(Exception):
pass

assert classify_status(ServiceUnavailable()) == "server_error"

def test_network_error(self):
class APIConnectionError(Exception):
pass

assert classify_status(APIConnectionError()) == "network_error"

def test_client_error(self):
for cls_name in ("BadRequestError", "AuthenticationError", "PermissionError"):
cls = type(cls_name, (Exception,), {})
assert classify_status(cls()) == "client_error"

def test_unknown_falls_back(self):
class WeirdProviderException(Exception):
pass

assert classify_status(WeirdProviderException()) == "other_error"


class TestGetLLMMetrics:
def test_returns_llm_metrics_instance(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
m = get_llm_metrics()
assert isinstance(m, LLMMetrics)

def test_singleton_returns_same_instance(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
first = get_llm_metrics()
second = get_llm_metrics()
assert first is second

def test_instruments_exist(self, monkeypatch):
monkeypatch.setattr(llm_metrics, "_llm_metrics", None)
m = get_llm_metrics()
for name in (
"requests",
"ttft_ms",
"ttat_ms",
"tps",
"input_tokens",
"output_tokens",
"cached_input_tokens",
"reasoning_tokens",
):
assert hasattr(m, name), f"missing instrument: {name}"
135 changes: 135 additions & 0 deletions src/agentex/lib/core/observability/tests/test_llm_metrics_hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Tests for ``agentex.lib.core.observability.llm_metrics_hooks``."""

from __future__ import annotations

from unittest.mock import MagicMock

import pytest

import agentex.lib.core.observability.llm_metrics_hooks as hooks_module
from agentex.lib.core.observability.llm_metrics_hooks import (
LLMMetricsHooks,
record_llm_failure,
)


def _mock_response(
*,
input_tokens: int = 100,
output_tokens: int = 50,
cached_tokens: int = 30,
reasoning_tokens: int = 10,
) -> MagicMock:
response = MagicMock()
response.usage.input_tokens = input_tokens
response.usage.output_tokens = output_tokens
response.usage.input_tokens_details.cached_tokens = cached_tokens
response.usage.output_tokens_details.reasoning_tokens = reasoning_tokens
return response


def _mock_agent(model: str = "gpt-5") -> MagicMock:
agent = MagicMock()
agent.model = model
return agent


class TestLLMMetricsHooksOnLLMEnd:
@pytest.mark.asyncio
async def test_emits_success_request_counter(self, monkeypatch):
m = MagicMock()
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

await LLMMetricsHooks().on_llm_end(
context=MagicMock(),
agent=_mock_agent("gpt-5"),
response=_mock_response(),
)

m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "success"})

@pytest.mark.asyncio
async def test_emits_token_counters(self, monkeypatch):
m = MagicMock()
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

await LLMMetricsHooks().on_llm_end(
context=MagicMock(),
agent=_mock_agent("gpt-5"),
response=_mock_response(
input_tokens=200,
output_tokens=75,
cached_tokens=50,
reasoning_tokens=20,
),
)

attrs = {"model": "gpt-5"}
m.input_tokens.add.assert_called_once_with(200, attrs)
m.output_tokens.add.assert_called_once_with(75, attrs)
m.cached_input_tokens.add.assert_called_once_with(50, attrs)
m.reasoning_tokens.add.assert_called_once_with(20, attrs)

@pytest.mark.asyncio
async def test_zero_tokens_emit_zero_not_skip(self, monkeypatch):
m = MagicMock()
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

await LLMMetricsHooks().on_llm_end(
context=MagicMock(),
agent=_mock_agent(),
response=_mock_response(input_tokens=0, output_tokens=0, cached_tokens=0, reasoning_tokens=0),
)

m.input_tokens.add.assert_called_once_with(0, {"model": "gpt-5"})
m.output_tokens.add.assert_called_once_with(0, {"model": "gpt-5"})

@pytest.mark.asyncio
async def test_unknown_model_falls_back(self, monkeypatch):
m = MagicMock()
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

agent = MagicMock()
agent.model = None

await LLMMetricsHooks().on_llm_end(
context=MagicMock(),
agent=agent,
response=_mock_response(),
)

m.requests.add.assert_called_once_with(1, {"model": "unknown", "status": "success"})

@pytest.mark.asyncio
async def test_swallows_exporter_failure(self, monkeypatch):
m = MagicMock()
m.requests.add.side_effect = RuntimeError("exporter exploded")
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

# Should not raise — caller's flow must not break on metric failure.
await LLMMetricsHooks().on_llm_end(
context=MagicMock(),
agent=_mock_agent(),
response=_mock_response(),
)


class TestRecordLLMFailure:
def test_emits_classified_status(self, monkeypatch):
m = MagicMock()
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

class RateLimitError(Exception):
pass

record_llm_failure("gpt-5", RateLimitError())

m.requests.add.assert_called_once_with(1, {"model": "gpt-5", "status": "rate_limit"})

def test_swallows_exporter_failure(self, monkeypatch):
m = MagicMock()
m.requests.add.side_effect = RuntimeError("exporter exploded")
monkeypatch.setattr(hooks_module, "get_llm_metrics", lambda: m)

# Should not raise.
record_llm_failure("gpt-5", Exception("upstream"))
Loading
Loading