From 37a2e2a6a3dfb56d3cb53a3d5cff64c4a7e07b60 Mon Sep 17 00:00:00 2001 From: Season Date: Sat, 18 Apr 2026 21:39:01 +0800 Subject: [PATCH 1/3] feat(ai-openrouter): capture per-request cost from chat responses Capture OpenRouter's per-request cost from the SSE chat response and surface it on UsageTotals.costDetails so callers can track spend. Implementation notes: - Cost capture is isolated per-request so concurrent chats don't cross-contaminate; SSE parsing matches separators directly (handles LF and CRLF) and matches the content-type case-insensitively. - Skipped on non-streaming chat responses to avoid needless overhead. - Omits usage entirely when token counts are missing, even if a cost was captured, so downstream consumers never see partial usage. - Does not downgrade a RUN_FINISHED event to RUN_ERROR on a late stream abort; the deferral trade-off is documented in the adapter. - Flushes the trailing SSE frame and resolves no-cost takes fast. --- .changeset/openrouter-cost-tracking.md | 11 + README.md | 1 + docs/adapters/openrouter.md | 26 + .../src/adapters/cost-capture.ts | 351 +++++++++++++ .../ai-openrouter/src/adapters/text.ts | 145 ++++-- .../ai-openrouter/tests/cost-capture.test.ts | 481 ++++++++++++++++++ .../tests/openrouter-adapter.test.ts | 278 +++++++++- .../src/activities/chat/middleware/types.ts | 25 +- packages/typescript/ai/src/types.ts | 36 +- 9 files changed, 1284 insertions(+), 70 deletions(-) create mode 100644 .changeset/openrouter-cost-tracking.md create mode 100644 packages/typescript/ai-openrouter/src/adapters/cost-capture.ts create mode 100644 packages/typescript/ai-openrouter/tests/cost-capture.test.ts diff --git a/.changeset/openrouter-cost-tracking.md b/.changeset/openrouter-cost-tracking.md new file mode 100644 index 000000000..c219d61fc --- /dev/null +++ b/.changeset/openrouter-cost-tracking.md @@ -0,0 +1,11 @@ +--- +'@tanstack/ai-openrouter': minor +'@tanstack/ai': minor +--- + +- Add OpenRouter cost tracking. The `OpenRouterTextAdapter` now attaches the authoritative per-request USD cost to the `RUN_FINISHED` event under `usage.cost`, along with `usage.costDetails.upstreamInferenceCost` and `usage.costDetails.cacheDiscount` when present. Cost is sourced from OpenRouter's chat completion response itself (the field arrives in the trailing SSE chunk), so there is no extra HTTP request and no added end-of-stream latency. +- Cost is always sourced from OpenRouter — never computed locally from token counts and a price table — because OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK) with different pricing, and applies cache discounts the SDK cannot reconstruct. +- Cost is captured via the SDK's public `HTTPClient` response hook. The hook calls `Response.clone()` and parses the cloned body out-of-band to recover `usage.cost` and `usage.cost_details`, which the @openrouter/sdk Zod parser would otherwise strip (the schema doesn't declare those fields). The SDK's stream consumer is unaffected — both clones are read independently. +- Custom `httpClient` values passed into the adapter are preserved: the adapter clones the caller's client (inheriting their fetcher, retries, tracing, and any pre-registered hooks) and appends the cost-capture hook to the clone, so the caller's original instance is never mutated and cost tracking still works when callers supply their own transport. +- Defer the OpenRouter `RUN_FINISHED` emission until after the upstream stream fully drains, so token usage that arrives in a trailing usage-only chunk (the common case for OpenAI-compatible providers, where the final chunk has empty `choices`) is included in `usage` instead of being dropped. +- Extend `RunFinishedEvent.usage` in `@tanstack/ai` with optional `cost` and `costDetails` fields. The middleware `UsageInfo` (consumed by `onUsage`) and `FinishInfo.usage` (consumed by `onFinish`) carry the same fields, so middleware authors can read cost without casts. The change is additive and backwards-compatible for adapters that don't populate cost. diff --git a/README.md b/README.md index cd8d92350..87907eca9 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ A powerful, type-safe AI SDK for building AI-powered applications. - Isomorphic type-safe tools with server/client execution - **Enhanced integration with TanStack Start** - Share implementations between AI tools and server functions - **Observability events** - Structured, typed events for text, tools, image, speech, transcription, and video ([docs](./docs/guides/observability.md)) +- **Cost tracking** - Per-request USD cost on `RUN_FINISHED` for providers that report it (currently [OpenRouter](./docs/adapters/openrouter.md#cost-tracking)) ### Read the docs → diff --git a/docs/adapters/openrouter.md b/docs/adapters/openrouter.md index c61fcff96..5442e8746 100644 --- a/docs/adapters/openrouter.md +++ b/docs/adapters/openrouter.md @@ -115,6 +115,32 @@ Set your API key in environment variables: OPENROUTER_API_KEY=sk-or-... ``` +## Cost Tracking + +The OpenRouter adapter attaches the authoritative per-request cost to the `RUN_FINISHED` event under `usage.cost` (USD). OpenRouter [reports cost inline in every chat response](https://openrouter.ai/docs/use-cases/usage-accounting), so cost arrives in the same SSE stream as the model output — there is **no extra HTTP request** and **no added latency**. + +Why we don't compute cost locally from tokens × price: OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK), each with different pricing, plus applies cache discounts and BYOK upstream costs. A static price table would silently drift and produce wrong numbers. + +```typescript +import { chat } from "@tanstack/ai"; +import { openRouterText } from "@tanstack/ai-openrouter"; + +const stream = chat({ + adapter: openRouterText("openai/gpt-5"), + messages: [{ role: "user", content: "Hello!" }], +}); + +for await (const chunk of stream) { + if (chunk.type === "RUN_FINISHED") { + console.log("USD cost:", chunk.usage?.cost); + console.log("Upstream inference cost:", chunk.usage?.costDetails?.upstreamInferenceCost); + console.log("Cache discount:", chunk.usage?.costDetails?.cacheDiscount); + } +} +``` + +If a particular response doesn't include cost (rare — for example if a future OpenRouter response shape change moves the field), `RUN_FINISHED` still emits with token usage; only the `cost` field is omitted. Cost tracking will never break a chat stream. + ## Model Routing OpenRouter can automatically route requests to the best available provider: diff --git a/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts b/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts new file mode 100644 index 000000000..e5eb564ad --- /dev/null +++ b/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts @@ -0,0 +1,351 @@ +import { HTTPClient } from '@openrouter/sdk' + +export interface CostInfo { + cost?: number + costDetails?: { + upstreamInferenceCost?: number | null + cacheDiscount?: number | null + } +} + +interface CostEntry { + info: CostInfo + timer: ReturnType +} + +interface ParseEntry { + parse: Promise + timer?: ReturnType +} + +/** + * Per-response cost cache, keyed by upstream response id. + * + * The chat-completion response carries `usage.cost` in its trailing chunk, + * but the @openrouter/sdk Zod parser strips it (the schema doesn't declare + * the field, and Zod defaults to strip mode). We clone the response inside + * the SDK's own response hook, parse our copy out-of-band, and stash cost + * here so the adapter can read it after the SDK-parsed stream ends. + * + * The hook runs the parse as a fire-and-forget Promise, so by the time the + * adapter's `for await` loop exits, `store.set(id, cost)` may not yet have + * run — there's no direct happens-before relationship between the SDK's + * stream consumer and the tee'd parse reader. `take(id)` awaits **only the + * parse that has announced this id** (via `announceId`), so overlapping + * `chat.send` calls on the same adapter cannot block each other's cost + * delivery. If no parse has announced the id yet, `take(id)` waits for the + * next announcement or for all currently-pending parses to settle, + * whichever comes first. + * + * Entries also auto-expire so a missing read (errored stream, missing id, + * etc.) cannot leak memory across long-lived adapters. + */ +export class CostStore { + private entries = new Map() + private unannouncedParses = new Set>() + private idToParse = new Map() + private announcementWaiters = new Set<() => void>() + private readonly ttlMs: number + + constructor(ttlMs = 60_000) { + this.ttlMs = ttlMs + } + + set(id: string, info: CostInfo): void { + const existing = this.entries.get(id) + if (existing) clearTimeout(existing.timer) + const timer = setTimeout(() => this.entries.delete(id), this.ttlMs) + // In browser/Deno runtimes `setTimeout` returns a number — `'unref' in + // ` would throw. Gate on object-ness before probing for unref(). + if (typeof timer === 'object' && 'unref' in timer) timer.unref() + this.entries.set(id, { info, timer }) + } + + recordParse(parse: Promise): void { + this.unannouncedParses.add(parse) + parse.finally(() => this.unannouncedParses.delete(parse)) + } + + announceId(id: string, parse: Promise): void { + // Once a parse has bound to an id, it's no longer a fallback candidate + // for other ids — removing it here prevents head-of-line blocking where + // an id with no matching parse would otherwise await this (unrelated) + // stream as part of the "current wave drained" fallback. + this.unannouncedParses.delete(parse) + const existing = this.idToParse.get(id) + if (existing?.timer) clearTimeout(existing.timer) + const entry: ParseEntry = { parse } + this.idToParse.set(id, entry) + // After the parse settles, keep the entry around briefly so a late + // `take(id)` can resolve without falling through to the pending-parses + // wait (which would reintroduce head-of-line blocking on unrelated + // concurrent streams — especially for responses that announce an id + // but never produce `usage.cost`). TTL eventually drops it. + parse.finally(() => { + const timer = setTimeout(() => { + if (this.idToParse.get(id) === entry) this.idToParse.delete(id) + }, this.ttlMs) + if (typeof timer === 'object' && 'unref' in timer) timer.unref() + entry.timer = timer + }) + // Wake every `take()` that was parked waiting for *some* announcement — + // the ones whose id doesn't match will fall back into the wait loop. + const waiters = [...this.announcementWaiters] + this.announcementWaiters.clear() + for (const w of waiters) w() + } + + async take(id: string): Promise { + // Fast path: the tee'd parser typically finishes before the SDK's + // stream consumer exits (less work per chunk than the Zod pipeline), + // so skip the wait entirely when the entry is already populated. + const preset = this.entries.get(id) + if (preset) { + clearTimeout(preset.timer) + this.entries.delete(id) + return preset.info + } + // Slow path: prefer per-id matching so an unrelated long-running stream + // on the same adapter cannot delay our RUN_FINISHED. If the matching + // parse hasn't announced yet, park on the next announcement or on the + // current wave of parses draining (whichever happens first), then loop. + for (;;) { + const match = this.idToParse.get(id) + if (match) { + await match.parse.catch(() => {}) + if (match.timer) clearTimeout(match.timer) + if (this.idToParse.get(id) === match) this.idToParse.delete(id) + break + } + if (this.unannouncedParses.size === 0) break + let resolveSignal: () => void = () => {} + const nextAnnouncement = new Promise((resolve) => { + resolveSignal = resolve + this.announcementWaiters.add(resolve) + }) + const currentWaveDrained = Promise.allSettled([ + ...this.unannouncedParses, + ]).then(() => {}) + await Promise.race([nextAnnouncement, currentWaveDrained]) + this.announcementWaiters.delete(resolveSignal) + } + const entry = this.entries.get(id) + if (!entry) return undefined + clearTimeout(entry.timer) + this.entries.delete(id) + return entry.info + } + + clear(): void { + for (const { timer } of this.entries.values()) clearTimeout(timer) + for (const entry of this.idToParse.values()) { + if (entry.timer) clearTimeout(entry.timer) + } + this.entries.clear() + this.unannouncedParses.clear() + this.idToParse.clear() + this.announcementWaiters.clear() + } +} + +/** + * Returns a response hook for the SDK's public `HTTPClient` that captures + * `usage.cost` and `usage.cost_details` from chat-completion responses + * without consuming the SDK's body. + * + * `Response.clone()` tees the body internally — our consumer reads one + * branch while the SDK reads the other exactly as if no interception + * happened. Any parse error is silently swallowed because cost is a + * secondary signal and must not break the chat stream. + */ +export function createCostCaptureHook( + store: CostStore, +): (res: Response, req: Request) => void { + return (res, req) => { + // Cheap header check first: `structuredOutput()` calls `/chat/completions` + // with `stream: false` and returns JSON — matching URL before content-type + // would pay a `new URL()` parse on every one of those. + // Content-Type is case-insensitive per RFC 9110; normalize before matching. + const contentType = (res.headers.get('content-type') ?? '').toLowerCase() + if (!contentType.includes('text/event-stream')) return + if (!isChatCompletionsRequest(req.url)) return + if (!res.body) return + let copy: Response + try { + copy = res.clone() + } catch { + // A preceding response hook consumed the body (e.g. read `res.text()` + // for logging). Cloning now throws `TypeError: Body already read`; + // give up on cost rather than bubbling a failure that the SDK's hook + // loop would surface as a whole-request error. + return + } + if (!copy.body) return + // Announce the id the moment the parser sees it so concurrent `take(id)` + // calls can await only *this* parse instead of every in-flight one. + // Forward-ref: the parser needs a callback that references `parse`, but + // `parse` isn't bound yet at construction time. The stable `onId` + // trampoline delegates to `announce`, which we swap in right after + // `parse` is assigned — the parser only fires `onId` after its first + // async read, by which point the swap has happened. + let announce: (id: string) => void = () => {} + const onId = (id: string) => announce(id) + const parse = parseSseAndStore(copy.body, store, onId).catch(() => {}) + announce = (id) => store.announceId(id, parse) + store.recordParse(parse) + } +} + +/** + * Returns an `HTTPClient` with cost capture attached. When `client` is + * provided, clones it — the clone inherits the caller's fetcher and any + * hooks they registered, and our hook is appended. We never mutate the + * caller's instance, so they can reuse it for unrelated SDK calls without + * picking up our cost-capture behavior as a side effect. + */ +export function attachCostCapture( + store: CostStore, + client?: HTTPClient, +): HTTPClient { + const wrapped = client ? client.clone() : new HTTPClient() + wrapped.addHook('response', createCostCaptureHook(store)) + return wrapped +} + +function isChatCompletionsRequest(url: string): boolean { + // Match against pathname only so a query string like `?next=/chat/completions` + // on an unrelated endpoint doesn't trip the hook. The SDK sends absolute + // URLs today, but a custom `Fetcher` could pass something else — fail closed + // rather than letting a parse error bubble into the SDK's hook chain. + try { + return /\/chat\/completions(?:\/|$)/.test(new URL(url).pathname) + } catch { + return false + } +} + +// Event separators per SSE spec: a blank line between events may use any of +// `\n\n`, `\r\n\r\n`, or `\r\r`. Matching all three directly (instead of +// pre-normalizing line endings) avoids a chunk-boundary bug where a lone +// `\r` at the end of one read would be rewritten to `\n` before the paired +// `\n` of a CRLF landed in the next read, producing a false `\n\n` frame +// split in the middle of a line. +const SSE_EVENT_SEPARATOR = /\r\n\r\n|\r\r|\n\n/ +// Split on any single-line terminator per SSE spec (`\r\n`, `\r`, or `\n`) +// so we don't leave a trailing `\r` in the payload when servers use CRLF +// inside a frame. Hoisted out of `extractDataPayload` so the regex isn't +// recompiled on every event parse. +const SSE_LINE_TERMINATOR = /\r\n|\r|\n/ + +async function parseSseAndStore( + body: ReadableStream, + store: CostStore, + onId?: (id: string) => void, +): Promise { + const reader = body.getReader() + const decoder = new TextDecoder() + let buffer = '' + let responseId: string | undefined + let cost: CostInfo | undefined + const applyEvent = (event: string): void => { + const payload = extractDataPayload(event) + if (!payload || payload === '[DONE]') return + const parsed = safeParseJson(payload) + if (!parsed) return + if (!responseId && typeof parsed.id === 'string') { + responseId = parsed.id + onId?.(responseId) + } + const usage = parsed.usage as Record | undefined + if (usage) { + const extracted = extractCostFromUsage(usage) + if (extracted) cost = extracted + } + } + try { + for (;;) { + const { value, done } = await reader.read() + buffer += decoder.decode(value, { stream: true }) + // Network reads may split SSE events or batch several together — drain + // separator-delimited frames and keep any trailing partial in `buffer` + // for the next read so we never parse a half-received JSON payload. + for (;;) { + const match = SSE_EVENT_SEPARATOR.exec(buffer) + if (!match) break + const event = buffer.slice(0, match.index) + buffer = buffer.slice(match.index + match[0].length) + applyEvent(event) + } + if (done) { + // EOF-terminated SSE can legitimately omit the final separator + // (especially through proxies). Flush whatever is left as a final + // event so the trailing usage chunk isn't silently dropped. + if (buffer.length > 0) applyEvent(buffer) + break + } + // Cost arrives in the trailing usage chunk, after which there's no + // more data we care about. Cancel our clone early so the upstream + // body can be GC'd; the SDK's clone half is unaffected. + if (responseId && cost) { + await reader.cancel().catch(() => {}) + break + } + } + } finally { + reader.releaseLock() + } + if (responseId && cost) store.set(responseId, cost) +} + +function extractDataPayload(event: string): string | undefined { + const lines: Array = [] + for (const line of event.split(SSE_LINE_TERMINATOR)) { + if (!line.startsWith('data:')) continue + lines.push(line.slice(5).replace(/^ /, '')) + } + if (!lines.length) return undefined + return lines.join('\n') +} + +function safeParseJson(text: string): Record | undefined { + try { + const v = JSON.parse(text) + return v && typeof v === 'object' ? (v as Record) : undefined + } catch { + return undefined + } +} + +function pickNumberOrNull( + obj: Record | undefined, + key: string, +): number | null | undefined { + if (!obj) return undefined + const v = obj[key] + if (typeof v === 'number') return v + if (v === null) return null + return undefined +} + +function extractCostFromUsage( + usage: Record, +): CostInfo | undefined { + // `cost` is the authoritative per-request USD total. Emitting `costDetails` + // without it would surface a breakdown that can't be reconciled against a + // total — callers could misread it as the bill. + const cost = typeof usage.cost === 'number' ? usage.cost : undefined + if (cost === undefined) return undefined + const details = usage.cost_details as Record | undefined + const upstream = pickNumberOrNull(details, 'upstream_inference_cost') + const cacheDiscount = pickNumberOrNull(details, 'cache_discount') + const hasDetails = upstream !== undefined || cacheDiscount !== undefined + return { + cost, + ...(hasDetails && { + costDetails: { + ...(upstream !== undefined && { upstreamInferenceCost: upstream }), + ...(cacheDiscount !== undefined && { cacheDiscount }), + }, + }), + } +} diff --git a/packages/typescript/ai-openrouter/src/adapters/text.ts b/packages/typescript/ai-openrouter/src/adapters/text.ts index 29427171c..3ce84812a 100644 --- a/packages/typescript/ai-openrouter/src/adapters/text.ts +++ b/packages/typescript/ai-openrouter/src/adapters/text.ts @@ -7,6 +7,8 @@ import { getOpenRouterApiKeyFromEnv, generateId as utilGenerateId, } from '../utils' +import { CostStore, attachCostCapture } from './cost-capture' +import type { CostInfo } from './cost-capture' import type { SDKOptions } from '@openrouter/sdk' import type { OPENROUTER_CHAT_MODELS, @@ -21,6 +23,7 @@ import type { import type { ContentPart, ModelMessage, + RunFinishedEvent, StreamChunk, TextOptions, } from '@tanstack/ai' @@ -86,7 +89,7 @@ interface AGUIState { deferredUsage: | { promptTokens: number; completionTokens: number; totalTokens: number } | undefined - computedFinishReason: string | undefined + computedFinishReason: RunFinishedEvent['finishReason'] | undefined } export class OpenRouterTextAdapter< @@ -104,10 +107,18 @@ export class OpenRouterTextAdapter< readonly name = 'openrouter' as const private client: OpenRouter + private costStore: CostStore constructor(config: OpenRouterConfig, model: TModel) { super({}, model) - this.client = new OpenRouter(config) + this.costStore = new CostStore() + // Wrap the caller's HTTPClient (if any) by cloning and appending a + // response hook — their fetcher, retries, tracing, and pre-registered + // hooks are preserved, but their instance stays untouched. + this.client = new OpenRouter({ + ...config, + httpClient: attachCostCapture(this.costStore, config.httpClient), + }) } async *chatStream( @@ -153,65 +164,88 @@ export class OpenRouterTextAdapter< if (chunk.id) responseId = chunk.id if (chunk.model) currentModel = chunk.model - // Emit RUN_STARTED on first chunk - if (!aguiState.hasEmittedRunStarted) { - aguiState.hasEmittedRunStarted = true - yield asChunk({ - type: 'RUN_STARTED', - runId: aguiState.runId, - threadId: aguiState.threadId, - model: currentModel || options.model, - timestamp, - }) - } + // Emit RUN_STARTED on first chunk + if (!aguiState.hasEmittedRunStarted) { + aguiState.hasEmittedRunStarted = true + yield asChunk({ + type: 'RUN_STARTED', + runId: aguiState.runId, + threadId: aguiState.threadId, + model: currentModel || options.model, + timestamp, + }) + } - if (chunk.error) { - // Emit AG-UI RUN_ERROR - yield asChunk({ - type: 'RUN_ERROR', - runId: aguiState.runId, - model: currentModel || options.model, - timestamp, - message: chunk.error.message || 'Unknown error', - code: String(chunk.error.code), - error: { + if (chunk.error) { + // Emit AG-UI RUN_ERROR + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: currentModel || options.model, + timestamp, message: chunk.error.message || 'Unknown error', code: String(chunk.error.code), - }, - }) - continue - } + error: { + message: chunk.error.message || 'Unknown error', + code: String(chunk.error.code), + }, + }) + continue + } - for (const choice of chunk.choices) { - yield* this.processChoice( - choice, - toolCallBuffers, - { - id: responseId || this.generateId(), - model: currentModel, - timestamp, - }, - { reasoning: accumulatedReasoning, content: accumulatedContent }, - (r, c) => { - accumulatedReasoning = r - accumulatedContent = c - }, - chunk.usage, - aguiState, - ) - } + for (const choice of chunk.choices) { + yield* this.processChoice( + choice, + toolCallBuffers, + { + id: responseId || this.generateId(), + model: currentModel, + timestamp, + }, + { reasoning: accumulatedReasoning, content: accumulatedContent }, + (r, c) => { + accumulatedReasoning = r + accumulatedContent = c + }, + chunk.usage, + aguiState, + ) + } + + // Capture usage from a trailing `choices: []` chunk that the + // choice loop above would have skipped. OpenRouter (and other + // OpenAI-compatible streams) often report final token counts in + // a terminal chunk with no choices, after `finishReason` was + // delivered on an earlier chunk. + if (chunk.usage && !aguiState.deferredUsage) { + aguiState.deferredUsage = { + promptTokens: chunk.usage.promptTokens || 0, + completionTokens: chunk.usage.completionTokens || 0, + totalTokens: chunk.usage.totalTokens || 0, + } + } } // Emit RUN_FINISHED after the stream ends so we capture usage from // any chunk (some SDKs send usage on a separate trailing chunk). if (aguiState.hasEmittedRunFinished && aguiState.computedFinishReason) { + // Deferral: RUN_FINISHED waits for the stream to fully drain + // because OpenRouter delivers `usage.cost` in a trailing + // `choices: []` chunk that arrives after `finishReason`. Cost is + // captured by the HTTPClient response hook while the SDK was + // draining the stream; `take()` awaits only the matching parse so + // overlapping sends don't block each other. If absent, RUN_FINISHED + // simply omits `usage.cost`. + const costInfo = responseId + ? await this.costStore.take(responseId) + : undefined yield asChunk({ type: 'RUN_FINISHED', runId: aguiState.runId, threadId: aguiState.threadId, model: currentModel || options.model, timestamp, - usage: aguiState.deferredUsage, + usage: this.buildRunFinishedUsage(aguiState.deferredUsage, costInfo), finishReason: aguiState.computedFinishReason, }) } @@ -332,6 +366,25 @@ export class OpenRouterTextAdapter< return utilGenerateId(this.name) } + private buildRunFinishedUsage( + usage: + | { promptTokens: number; completionTokens: number; totalTokens: number } + | undefined, + costInfo: CostInfo | undefined, + ): RunFinishedEvent['usage'] { + // If no token counts arrived (e.g. the stream aborted before the + // trailing usage chunk), emit no usage at all — even if `costInfo` was + // captured. Synthesizing zero-token counts alongside a non-zero cost + // would feed billing/telemetry a "successful run with zero tokens but + // $X cost" signal, which is worse than an absent usage payload. + if (!usage) return undefined + return { + ...usage, + ...(costInfo?.cost !== undefined && { cost: costInfo.cost }), + ...(costInfo?.costDetails && { costDetails: costInfo.costDetails }), + } + } + private *processChoice( choice: ChatStreamChoice, toolCallBuffers: Map, diff --git a/packages/typescript/ai-openrouter/tests/cost-capture.test.ts b/packages/typescript/ai-openrouter/tests/cost-capture.test.ts new file mode 100644 index 000000000..0d554bd47 --- /dev/null +++ b/packages/typescript/ai-openrouter/tests/cost-capture.test.ts @@ -0,0 +1,481 @@ +import { describe, expect, it, vi } from 'vitest' +import { HTTPClient } from '@openrouter/sdk' +import { + CostStore, + attachCostCapture, + createCostCaptureHook, +} from '../src/adapters/cost-capture' +import type { CostInfo } from '../src/adapters/cost-capture' +import type { Fetcher } from '@openrouter/sdk' + +function makeSseResponse(body: string): Response { + return new Response(body, { + status: 200, + headers: { 'content-type': 'text/event-stream' }, + }) +} + +function makeJsonResponse(payload: unknown): Response { + return new Response(JSON.stringify(payload), { + status: 200, + headers: { 'content-type': 'application/json' }, + }) +} + +const fakeFetcher = (response: Response): Fetcher => async () => response + +async function readAll(stream: ReadableStream | null): Promise { + if (!stream) return '' + const reader = stream.getReader() + const decoder = new TextDecoder() + let out = '' + while (true) { + const { value, done } = await reader.read() + if (value) out += decoder.decode(value, { stream: true }) + if (done) break + } + return out +} + +const chatUrl = 'https://openrouter.ai/api/v1/chat/completions' + +function makeChatRequest(url = chatUrl): Request { + return new Request(url, { method: 'POST' }) +} + +function buildClient( + response: Response, + store: CostStore = new CostStore(), +): { client: HTTPClient; store: CostStore } { + const client = new HTTPClient({ fetcher: fakeFetcher(response) }) + client.addHook('response', createCostCaptureHook(store)) + return { client, store } +} + +describe('createCostCaptureHook — SSE chat-completion responses', () => { + it('extracts cost and cost_details from the trailing usage chunk', async () => { + const body = + `data: ${JSON.stringify({ id: 'gen-1', choices: [{ delta: { content: 'Hi' }, index: 0 }] })}\n\n` + + `data: ${JSON.stringify({ + id: 'gen-1', + choices: [], + usage: { + prompt_tokens: 5, + completion_tokens: 2, + total_tokens: 7, + cost: 0.001234, + cost_details: { + upstream_inference_cost: 0.001, + cache_discount: -0.0001, + }, + }, + })}\n\n` + + `data: [DONE]\n\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + + expect(await readAll(res.body)).toBe(body) + expect(await store.take('gen-1')).toEqual({ + cost: 0.001234, + costDetails: { + upstreamInferenceCost: 0.001, + cacheDiscount: -0.0001, + }, + }) + }) + + it('handles cost without cost_details', async () => { + const body = + `data: ${JSON.stringify({ + id: 'gen-2', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.5, + }, + })}\n\ndata: [DONE]\n\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + await readAll(res.body) + + expect(await store.take('gen-2')).toEqual({ cost: 0.5 }) + }) + + // Skipping an orphan `cost_details` is deliberate: a breakdown without the + // authoritative `cost` total can't be reconciled, and surfacing it invites + // callers to misread a partial bill. + it.each([ + [ + 'no cost_details', + { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 }, + ], + [ + 'orphan cost_details', + { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost_details: { upstream_inference_cost: 0.0005 }, + }, + ], + ])( + 'stores nothing when the response has no cost field (%s)', + async (_label, usage) => { + const id = 'gen-nocost' + const body = + `data: ${JSON.stringify({ id, choices: [], usage })}\n\ndata: [DONE]\n\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + await readAll(res.body) + + expect(await store.take(id)).toBeUndefined() + }, + ) +}) + +describe('createCostCaptureHook — non-streaming responses are skipped', () => { + // Non-SSE responses on /chat/completions come from `structuredOutput()` + // (stream: false). These never consume the cost store, so cloning and + // re-parsing them is wasted work — the hook should bail early. + it('does not clone or parse a JSON response on the chat completions endpoint', async () => { + const payload = { + id: 'gen-json-1', + choices: [{ message: { content: '{}' } }], + usage: { + prompt_tokens: 4, + completion_tokens: 2, + total_tokens: 6, + cost: 0.0008, + cost_details: { upstream_inference_cost: 0.0005 }, + }, + } + const json = JSON.stringify(payload) + + const { client, store } = buildClient(makeJsonResponse(payload)) + const res = await client.request(makeChatRequest()) + + // Body is delivered to the caller unchanged and nothing is cached. + expect(await res.text()).toBe(json) + expect(await store.take('gen-json-1')).toBeUndefined() + }) +}) + +describe('createCostCaptureHook — passes through unrelated requests', () => { + it('does not parse non-chat responses', async () => { + const body = JSON.stringify({ data: { id: 'x', total_cost: 5 } }) + const passed = new Response(body, { + status: 200, + headers: { 'content-type': 'application/json' }, + }) + + const { client, store } = buildClient(passed) + const res = await client.request( + new Request('https://openrouter.ai/api/v1/generation?id=x'), + ) + + expect(await res.text()).toBe(body) + expect(await store.take('x')).toBeUndefined() + }) + + it('ignores SSE responses whose query string mentions /chat/completions', async () => { + // Pathname-only matching: a query param that happens to contain the + // chat-completions path must not activate the hook on an unrelated + // endpoint. + const body = + `data: ${JSON.stringify({ + id: 'spoof-1', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.42, + }, + })}\n\ndata: [DONE]\n\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request( + makeChatRequest( + 'https://openrouter.ai/api/v1/generation?next=/chat/completions', + ), + ) + + expect(await readAll(res.body)).toBe(body) + expect(await store.take('spoof-1')).toBeUndefined() + }) +}) + +describe('createCostCaptureHook — robustness', () => { + it('survives malformed SSE payloads without breaking the SDK stream', async () => { + const body = + `data: not-json\n\n` + + `data: ${JSON.stringify({ + id: 'gen-mix', + choices: [], + usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2, cost: 0.1 }, + })}\n\ndata: [DONE]\n\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + + expect(await readAll(res.body)).toBe(body) + expect(await store.take('gen-mix')).toEqual({ cost: 0.1 }) + }) + + it('returns the original response unchanged when there is no body', async () => { + const { client } = buildClient(new Response(null, { status: 204 })) + const res = await client.request(makeChatRequest()) + expect(res.status).toBe(204) + }) + + // Regression: proxies and some runtimes emit spec-compliant CRLF-framed + // SSE (`\r\n\r\n`). Splitting only on `\n\n` used to silently drop cost. + it('parses SSE with CRLF-delimited frames', async () => { + const body = + `data: ${JSON.stringify({ + id: 'gen-crlf', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.3, + }, + })}\r\n\r\ndata: [DONE]\r\n\r\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + await readAll(res.body) + + expect(await store.take('gen-crlf')).toEqual({ cost: 0.3 }) + }) + + // Regression: EOF-terminated SSE (proxies omitting the trailing blank + // line) used to drop the final usage chunk because the read loop broke + // on `done` before flushing `buffer`. + it('flushes the trailing SSE frame when the stream ends without a blank line', async () => { + const body = `data: ${JSON.stringify({ + id: 'gen-eof', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.2, + }, + })}\n` + + const { client, store } = buildClient(makeSseResponse(body)) + const res = await client.request(makeChatRequest()) + await readAll(res.body) + + expect(await store.take('gen-eof')).toEqual({ cost: 0.2 }) + }) +}) + +describe('attachCostCapture', () => { + const costBody = `data: ${JSON.stringify({ + id: 'gen-attach', + choices: [], + usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2, cost: 0.25 }, + })}\n\ndata: [DONE]\n\n` + + it('returns a fresh HTTPClient when no caller client is supplied', () => { + const store = new CostStore() + const wrapped = attachCostCapture(store) + expect(wrapped).toBeInstanceOf(HTTPClient) + }) + + it('clones the caller-supplied HTTPClient rather than mutating it', async () => { + const callerClient = new HTTPClient({ + fetcher: fakeFetcher(makeSseResponse(costBody)), + }) + + const store = new CostStore() + const wrapped = attachCostCapture(store, callerClient) + + expect(wrapped).not.toBe(callerClient) + + // Calling the caller's original client must not populate our store — + // proves we did not mutate it. + const direct = await callerClient.request(makeChatRequest()) + await readAll(direct.body) + expect(await store.take('gen-attach')).toBeUndefined() + }) + + it('preserves hooks registered on the caller before wrapping', async () => { + const callerClient = new HTTPClient({ + fetcher: fakeFetcher(makeSseResponse(costBody)), + }) + const callerHook = vi.fn() + callerClient.addHook('response', callerHook) + + const store = new CostStore() + const wrapped = attachCostCapture(store, callerClient) + + const res = await wrapped.request(makeChatRequest()) + await readAll(res.body) + + expect(callerHook).toHaveBeenCalledTimes(1) + expect(await store.take('gen-attach')).toEqual({ cost: 0.25 }) + }) + + it('inherits the caller fetcher (proxies, tracing, retries, etc.)', async () => { + const callerFetcher = vi.fn(fakeFetcher(makeSseResponse(costBody))) + const callerClient = new HTTPClient({ fetcher: callerFetcher }) + + const store = new CostStore() + const wrapped = attachCostCapture(store, callerClient) + + const res = await wrapped.request(makeChatRequest()) + await readAll(res.body) + + expect(callerFetcher).toHaveBeenCalledTimes(1) + expect(await store.take('gen-attach')).toEqual({ cost: 0.25 }) + }) +}) + +describe('CostStore', () => { + it('take() removes the entry after reading', async () => { + const store = new CostStore() + store.set('a', { cost: 1 }) + expect(await store.take('a')).toEqual({ cost: 1 }) + expect(await store.take('a')).toBeUndefined() + }) + + it('overwrites entries with the same id', async () => { + const store = new CostStore() + store.set('a', { cost: 1 }) + store.set('a', { cost: 2 }) + expect(await store.take('a')).toEqual({ cost: 2 }) + }) + + // Regression: the tee'd parse is fire-and-forget, so the adapter can + // reach `take(id)` before `store.set(id, ...)` has run. `take` must + // await outstanding parses to keep cost capture deterministic. + it('take() awaits an in-flight parse before reading', async () => { + const store = new CostStore() + let resolveParse!: () => void + const parse = new Promise((resolve) => { + resolveParse = resolve + }).then(() => { + store.set('racey', { cost: 7 }) + }) + store.recordParse(parse) + + let taken: CostInfo | undefined + const takePromise = store.take('racey').then((info) => { + taken = info + }) + + expect(taken).toBeUndefined() + resolveParse() + await takePromise + + expect(taken).toEqual({ cost: 7 }) + }) + + // Regression: a shared adapter can have overlapping `chat.send` calls. + // Previously `take(id)` awaited *every* in-flight parse, so a long- + // running concurrent stream could block an already-completed request's + // RUN_FINISHED. Per-id announcements mean `take(id)` awaits only the + // matching parse. + it('take(id) does not block on an unrelated in-flight parse', async () => { + const store = new CostStore() + + let resolveFast!: () => void + const fastParse = new Promise((resolve) => { + resolveFast = resolve + }).then(() => { + store.set('fast', { cost: 1 }) + }) + store.recordParse(fastParse) + store.announceId('fast', fastParse) + + let resolveSlow!: () => void + const slowParse = new Promise((resolve) => { + resolveSlow = resolve + }) + store.recordParse(slowParse) + store.announceId('slow', slowParse) + + let taken: CostInfo | undefined + const takePromise = store.take('fast').then((info) => { + taken = info + }) + + resolveFast() + await takePromise + + expect(taken).toEqual({ cost: 1 }) + + resolveSlow() + await slowParse + }) + + // Regression: a parse that announces its id but never produces cost + // (response simply had no `usage.cost`) used to lose its `idToParse` + // entry in `parse.finally`, forcing a subsequent `take(id)` to fall + // through to the pending-parses wait — which could then block on an + // unrelated concurrent stream. The settled parse must stay resolvable + // until `take(id)` consumes it (or TTL expires). + it('take(id) returns undefined promptly when the matching parse finished without cost', async () => { + const store = new CostStore() + + // Parse A announces id but never calls `set`, then settles. + const parseA = Promise.resolve() + store.recordParse(parseA) + store.announceId('A', parseA) + await parseA + + // Parse B stays in flight and never announces 'A'. + let resolveB!: () => void + const parseB = new Promise((resolve) => { + resolveB = resolve + }) + store.recordParse(parseB) + store.announceId('B', parseB) + + const info = await store.take('A') + expect(info).toBeUndefined() + + resolveB() + await parseB + }) +}) + +describe('createCostCaptureHook — resilience to preceding hooks', () => { + it('does not fail the request when a preceding hook consumed the body', async () => { + const body = + `data: ${JSON.stringify({ + id: 'gen-disturbed', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.9, + }, + })}\n\ndata: [DONE]\n\n` + + const client = new HTTPClient({ + fetcher: fakeFetcher(makeSseResponse(body)), + }) + // A preceding caller hook reads the body directly; this disturbs `res` + // so that subsequent `res.clone()` throws. We must not surface that as + // a request failure. + client.addHook('response', async (res) => { + await res.text() + }) + const store = new CostStore() + client.addHook('response', createCostCaptureHook(store)) + + await expect(client.request(makeChatRequest())).resolves.toBeDefined() + expect(await store.take('gen-disturbed')).toBeUndefined() + }) +}) diff --git a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts index 206d16525..ee1e88109 100644 --- a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts +++ b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts @@ -1,20 +1,30 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import { chat } from '@tanstack/ai' import { resolveDebugOption } from '@tanstack/ai/adapter-internals' +import { HTTPClient } from '@openrouter/sdk' import { ChatRequest$outboundSchema } from '@openrouter/sdk/models' import { createOpenRouterText } from '../src/adapters/text' -import type { OpenRouterTextModelOptions } from '../src/adapters/text' -import type { StreamChunk, Tool } from '@tanstack/ai' +import type { + OpenRouterTextAdapter, + OpenRouterTextModelOptions, +} from '../src/adapters/text' +import type { CostStore } from '../src/adapters/cost-capture' +import type { RunFinishedEvent, StreamChunk, Tool } from '@tanstack/ai' +import type * as OpenRouterSDK from '@openrouter/sdk' // Test helper: a silent logger for test chatStream calls. const testLogger = resolveDebugOption(false) -// Declare mockSend at module level +// Declare mocks at module level let mockSend: any -// Mock the SDK with a class defined inline -// eslint-disable-next-line @typescript-eslint/require-await +// Mock `OpenRouter` so unit tests can drive chat chunks without real HTTP. +// Keep the real `HTTPClient` (and other exports) since the adapter clones +// the caller's HTTPClient via `attachCostCapture` — a stubbed HTTPClient +// would break that path and hide regressions. vi.mock('@openrouter/sdk', async () => { + const actual = await vi.importActual('@openrouter/sdk') return { + ...actual, OpenRouter: class { chat = { send: (...args: Array) => mockSend(...args), @@ -26,6 +36,13 @@ vi.mock('@openrouter/sdk', async () => { const createAdapter = () => createOpenRouterText('openai/gpt-4o-mini', 'test-key') +// Tests that exercise cost-aware behavior seed the adapter's cost store +// directly, since the real hook only fires on actual HTTP responses (which +// are mocked away here). Real-HTTP coverage lives in tests/cost-capture.test.ts. +function getCostStore(adapter: OpenRouterTextAdapter): CostStore { + return (adapter as unknown as { costStore: CostStore }).costStore +} + const toolArguments = JSON.stringify({ location: 'Berlin' }) const weatherTool: Tool = { @@ -1664,3 +1681,254 @@ describe('OpenRouter STEP event consistency', () => { expect(stepFinished).toHaveLength(1) }) }) +describe('OpenRouter cost tracking', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + const baseStreamChunks = (id = 'gen-cost-1') => [ + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: 'Hi' }, finishReason: null }], + }, + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: {}, finishReason: 'stop' }], + usage: { promptTokens: 9, completionTokens: 3, totalTokens: 12 }, + }, + ] + + async function runAndGetFinished( + adapter: OpenRouterTextAdapter, + ): Promise { + let finished: RunFinishedEvent | undefined + for await (const chunk of chat({ + adapter, + messages: [{ role: 'user', content: 'hi' }], + })) { + if (chunk.type === 'RUN_FINISHED') finished = chunk as RunFinishedEvent + } + return finished + } + + it('attaches cost (USD) and details to RUN_FINISHED.usage when the hook populated the store', async () => { + setupMockSdkClient(baseStreamChunks('gen-cost-1')) + + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + getCostStore(adapter).set('gen-cost-1', { + cost: 0.0012, + costDetails: { + upstreamInferenceCost: 0.001, + cacheDiscount: -0.0001, + }, + }) + + const finished = await runAndGetFinished(adapter) + + expect(finished?.usage).toMatchObject({ + promptTokens: 9, + completionTokens: 3, + totalTokens: 12, + cost: 0.0012, + costDetails: { + upstreamInferenceCost: 0.001, + cacheDiscount: -0.0001, + }, + }) + }) + + it('omits cost when the response had no cost in its trailing chunk', async () => { + setupMockSdkClient(baseStreamChunks('gen-cost-2')) + + // No store seeding — simulates a response without cost (or one the + // hook never populated). + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + const finished = await runAndGetFinished(adapter) + + expect(finished?.usage).toEqual({ + promptTokens: 9, + completionTokens: 3, + totalTokens: 12, + }) + expect(finished?.usage).not.toHaveProperty('cost') + }) + + it('uses usage from a trailing usage-only chunk and still attaches cost', async () => { + const id = 'gen-trailing-usage' + const streamChunks = [ + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: 'Hi' }, finishReason: null }], + }, + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: {}, finishReason: 'stop' }], + }, + { + id, + model: 'openai/gpt-4o-mini', + choices: [], + usage: { promptTokens: 7, completionTokens: 2, totalTokens: 9 }, + }, + ] + + setupMockSdkClient(streamChunks) + + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + getCostStore(adapter).set(id, { cost: 0.42 }) + + const finished = await runAndGetFinished(adapter) + + expect(finished?.usage).toMatchObject({ + promptTokens: 7, + completionTokens: 2, + totalTokens: 9, + cost: 0.42, + }) + }) + + it('cost-store entries are consumed (each id is read at most once)', async () => { + setupMockSdkClient(baseStreamChunks('gen-consume')) + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + const store = getCostStore(adapter) + store.set('gen-consume', { cost: 0.99 }) + + await runAndGetFinished(adapter) + + expect(await store.take('gen-consume')).toBeUndefined() + }) + + // Regression: passing a custom httpClient used to bypass cost tracking + // entirely. The adapter now clones the caller's client and attaches the + // cost-capture hook to the clone, so the store is still populated and + // RUN_FINISHED still carries cost. + it('tracks cost when the caller supplies a custom httpClient', async () => { + setupMockSdkClient(baseStreamChunks('gen-custom-http')) + + const customClient = new HTTPClient() + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key', { + httpClient: customClient, + }) + + const store = getCostStore(adapter) + expect(store).toBeDefined() + store.set('gen-custom-http', { cost: 0.75 }) + + const finished = await runAndGetFinished(adapter) + + expect(finished?.usage).toMatchObject({ cost: 0.75 }) + }) + + // Regression: when the stream aborts after finishReason but before the + // trailing usage chunk, the tee'd cost parser can still populate cost + // (it reads independently of the SDK consumer). Emitting RUN_FINISHED + // with zero-token usage alongside a non-zero cost would be worse than + // no usage at all — billing/telemetry consumers would see a bogus + // "0 tokens, $X cost" signal. Drop usage entirely if tokens are absent. + it('does not synthesize zero-token usage when the trailing usage chunk never arrives', async () => { + const id = 'gen-late-abort-no-usage' + // No `usage` on the finishReason chunk — OpenRouter normally delivers + // it in a separate trailing chunk that this scenario never receives. + const okChunks = [ + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: 'Hi' }, finishReason: null }], + }, + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: {}, finishReason: 'stop' }], + }, + ] + mockSend = vi.fn().mockImplementation(() => + Promise.resolve({ + [Symbol.asyncIterator]() { + let i = 0 + return { + async next() { + if (i < okChunks.length) return { value: okChunks[i++]!, done: false } + throw new Error('aborted before usage chunk') + }, + } + }, + }), + ) + + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + // Simulate the tee'd parser beating the SDK consumer to the usage + // chunk: cost is in the store even though `finalUsage` will stay + // undefined for the SDK-visible stream. + getCostStore(adapter).set(id, { cost: 0.05 }) + + const chunks: Array = [] + for await (const chunk of chat({ + adapter, + messages: [{ role: 'user', content: 'hi' }], + })) { + chunks.push(chunk) + } + + const finished = chunks.find((c) => c.type === 'RUN_FINISHED') + expect(finished).toBeDefined() + if (finished?.type === 'RUN_FINISHED') { + expect(finished.usage).toBeUndefined() + expect(finished.finishReason).toBe('stop') + } + }) + + // Regression: deferring RUN_FINISHED until the stream drains (so we can + // read the trailing usage chunk) used to convert a late abort/disconnect + // into a user-visible RUN_ERROR — even when the run was logically + // complete. A terminal finishReason must still produce RUN_FINISHED. + it('still emits RUN_FINISHED when the stream errors after finishReason', async () => { + const id = 'gen-late-abort' + const okChunks = [ + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: { content: 'Hi' }, finishReason: null }], + }, + { + id, + model: 'openai/gpt-4o-mini', + choices: [{ delta: {}, finishReason: 'stop' }], + usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, + }, + ] + mockSend = vi.fn().mockImplementation(() => + Promise.resolve({ + [Symbol.asyncIterator]() { + let i = 0 + return { + async next() { + if (i < okChunks.length) return { value: okChunks[i++]!, done: false } + throw new Error('connection dropped after finishReason') + }, + } + }, + }), + ) + + const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') + const chunks: Array = [] + for await (const chunk of chat({ + adapter, + messages: [{ role: 'user', content: 'hi' }], + })) { + chunks.push(chunk) + } + + const finished = chunks.find((c) => c.type === 'RUN_FINISHED') + const errored = chunks.find((c) => c.type === 'RUN_ERROR') + expect(errored).toBeUndefined() + expect(finished).toBeDefined() + if (finished?.type === 'RUN_FINISHED') { + expect(finished.finishReason).toBe('stop') + } + }) +}) diff --git a/packages/typescript/ai/src/activities/chat/middleware/types.ts b/packages/typescript/ai/src/activities/chat/middleware/types.ts index 19ce04586..39b308ce4 100644 --- a/packages/typescript/ai/src/activities/chat/middleware/types.ts +++ b/packages/typescript/ai/src/activities/chat/middleware/types.ts @@ -1,4 +1,10 @@ -import type { ModelMessage, StreamChunk, Tool, ToolCall } from '../../../types' +import type { + ModelMessage, + StreamChunk, + Tool, + ToolCall, + UsageTotals, +} from '../../../types' // =========================== // Middleware Context @@ -221,14 +227,11 @@ export interface ToolPhaseCompleteInfo { // =========================== /** - * Token usage statistics passed to the onUsage hook. - * Extracted from the RUN_FINISHED chunk when usage data is present. + * Token usage statistics passed to the onUsage hook. Aliased to the + * canonical `UsageTotals` so this surface cannot drift from + * `RunFinishedEvent.usage`, which is where the runtime sources it from. */ -export interface UsageInfo { - promptTokens: number - completionTokens: number - totalTokens: number -} +export type UsageInfo = UsageTotals // =========================== // Terminal Hook Info @@ -245,11 +248,7 @@ export interface FinishInfo { /** Final accumulated text content */ content: string /** Final usage totals, if available */ - usage?: { - promptTokens: number - completionTokens: number - totalTokens: number - } + usage?: UsageInfo } /** diff --git a/packages/typescript/ai/src/types.ts b/packages/typescript/ai/src/types.ts index e11e7176f..aebd75e47 100644 --- a/packages/typescript/ai/src/types.ts +++ b/packages/typescript/ai/src/types.ts @@ -818,17 +818,41 @@ export interface RunStartedEvent extends AGUIRunStartedEvent { * @ag-ui/core provides: `threadId`, `runId`, `result?` * TanStack AI adds: `model?`, `finishReason?`, `usage?` */ +/** + * Per-run token and cost totals, shared between `RunFinishedEvent.usage` + * and the middleware `UsageInfo` so the two can never drift. + */ +export interface UsageTotals { + promptTokens: number + completionTokens: number + totalTokens: number + /** + * USD cost. Optional because most providers don't report cost per + * request — the canonical source is the user's billing dashboard, not + * the response. Adapters MUST forward only what the provider returned; + * they MUST NOT multiply tokens × price tables, since stale or wrong + * pricing data would silently corrupt accounting. + */ + cost?: number + /** + * Provider-reported cost breakdown. Loosely typed because providers + * disagree on what to expose (BYOK upstream costs, cache discounts, + * per-tier rates, ...) and locking the shape down would force every + * new adapter to either lie or back-fill. + */ + costDetails?: { + upstreamInferenceCost?: number | null + cacheDiscount?: number | null + } +} + export interface RunFinishedEvent extends AGUIRunFinishedEvent { /** Model identifier for multi-model support */ model?: string /** Why the generation stopped */ finishReason?: 'stop' | 'length' | 'content_filter' | 'tool_calls' | null - /** Token usage statistics */ - usage?: { - promptTokens: number - completionTokens: number - totalTokens: number - } + /** Token and cost totals */ + usage?: UsageTotals } /** From ccf22735f56258cfb523924927b5a367ed2e145c Mon Sep 17 00:00:00 2001 From: Season Date: Sat, 18 Apr 2026 23:41:39 +0800 Subject: [PATCH 2/3] feat(ai): allow provider-specific numeric fields on UsageTotals.costDetails The JSDoc promised "loosely typed" costDetails to accommodate provider divergence (BYOK upstream costs, cache discounts, per-tier rates) but the declared shape only allowed two OpenRouter-specific keys, so any other adapter would have been forced to `as any`-cast to report its own breakdown. Add a numeric index signature so additional keys are type-legal without a cast, matching the documented intent. Picked up in CodeRabbit review of #469. --- packages/typescript/ai/src/types.ts | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/typescript/ai/src/types.ts b/packages/typescript/ai/src/types.ts index aebd75e47..85c310958 100644 --- a/packages/typescript/ai/src/types.ts +++ b/packages/typescript/ai/src/types.ts @@ -812,12 +812,6 @@ export interface RunStartedEvent extends AGUIRunStartedEvent { model?: string } -/** - * Emitted when a run completes successfully. - * - * @ag-ui/core provides: `threadId`, `runId`, `result?` - * TanStack AI adds: `model?`, `finishReason?`, `usage?` - */ /** * Per-run token and cost totals, shared between `RunFinishedEvent.usage` * and the middleware `UsageInfo` so the two can never drift. @@ -838,14 +832,23 @@ export interface UsageTotals { * Provider-reported cost breakdown. Loosely typed because providers * disagree on what to expose (BYOK upstream costs, cache discounts, * per-tier rates, ...) and locking the shape down would force every - * new adapter to either lie or back-fill. + * new adapter to either lie or back-fill. The named fields are + * whatever OpenRouter currently reports; other adapters may report + * numeric fields under their own keys via the index signature. */ costDetails?: { upstreamInferenceCost?: number | null cacheDiscount?: number | null + [key: string]: number | null | undefined } } +/** + * Emitted when a run completes successfully. + * + * @ag-ui/core provides: `threadId`, `runId`, `result?` + * TanStack AI adds: `model?`, `finishReason?`, `usage?` + */ export interface RunFinishedEvent extends AGUIRunFinishedEvent { /** Model identifier for multi-model support */ model?: string From 71d9c304ca9b322d5a9288340f05e4d543d99d72 Mon Sep 17 00:00:00 2001 From: Season Date: Fri, 24 Apr 2026 13:48:05 +0800 Subject: [PATCH 3/3] refine OpenRouter cost tracking --- .changeset/openrouter-cost-tracking.md | 6 +- README.md | 2 +- docs/adapters/openrouter.md | 8 +- .../src/adapters/cost-capture.ts | 52 ++++---- .../ai-openrouter/src/adapters/text.ts | 112 ++++++++--------- .../ai-openrouter/tests/cost-capture.test.ts | 118 +++++++++-------- .../tests/openrouter-adapter.test.ts | 119 ++---------------- packages/typescript/ai/src/types.ts | 21 ++-- 8 files changed, 168 insertions(+), 270 deletions(-) diff --git a/.changeset/openrouter-cost-tracking.md b/.changeset/openrouter-cost-tracking.md index c219d61fc..47eee66e5 100644 --- a/.changeset/openrouter-cost-tracking.md +++ b/.changeset/openrouter-cost-tracking.md @@ -3,9 +3,9 @@ '@tanstack/ai': minor --- -- Add OpenRouter cost tracking. The `OpenRouterTextAdapter` now attaches the authoritative per-request USD cost to the `RUN_FINISHED` event under `usage.cost`, along with `usage.costDetails.upstreamInferenceCost` and `usage.costDetails.cacheDiscount` when present. Cost is sourced from OpenRouter's chat completion response itself (the field arrives in the trailing SSE chunk), so there is no extra HTTP request and no added end-of-stream latency. -- Cost is always sourced from OpenRouter — never computed locally from token counts and a price table — because OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK) with different pricing, and applies cache discounts the SDK cannot reconstruct. -- Cost is captured via the SDK's public `HTTPClient` response hook. The hook calls `Response.clone()` and parses the cloned body out-of-band to recover `usage.cost` and `usage.cost_details`, which the @openrouter/sdk Zod parser would otherwise strip (the schema doesn't declare those fields). The SDK's stream consumer is unaffected — both clones are read independently. +- Add OpenRouter cost tracking. The `OpenRouterTextAdapter` now attaches OpenRouter's authoritative per-request cost amount to the `RUN_FINISHED` event under `usage.cost`, along with numeric/null fields from `usage.cost_details` under `usage.costDetails`. Cost is sourced from OpenRouter's chat completion response itself (the field arrives in the trailing SSE chunk), so there is no extra HTTP request and no added end-of-stream latency. +- Cost is always sourced from OpenRouter — never computed locally from token counts and a price table — because OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK) with different pricing, and may expose provider-specific cost breakdowns the SDK cannot reconstruct. +- Cost is captured via the SDK's public `HTTPClient` response hook. The hook calls `Response.clone()` and parses the cloned body out-of-band to recover `usage.cost` and `usage.cost_details`, which the @openrouter/sdk chat-completion parser would otherwise strip. The SDK's stream consumer is unaffected — both clones are read independently. - Custom `httpClient` values passed into the adapter are preserved: the adapter clones the caller's client (inheriting their fetcher, retries, tracing, and any pre-registered hooks) and appends the cost-capture hook to the clone, so the caller's original instance is never mutated and cost tracking still works when callers supply their own transport. - Defer the OpenRouter `RUN_FINISHED` emission until after the upstream stream fully drains, so token usage that arrives in a trailing usage-only chunk (the common case for OpenAI-compatible providers, where the final chunk has empty `choices`) is included in `usage` instead of being dropped. - Extend `RunFinishedEvent.usage` in `@tanstack/ai` with optional `cost` and `costDetails` fields. The middleware `UsageInfo` (consumed by `onUsage`) and `FinishInfo.usage` (consumed by `onFinish`) carry the same fields, so middleware authors can read cost without casts. The change is additive and backwards-compatible for adapters that don't populate cost. diff --git a/README.md b/README.md index 87907eca9..7bb11c5d0 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ A powerful, type-safe AI SDK for building AI-powered applications. - Isomorphic type-safe tools with server/client execution - **Enhanced integration with TanStack Start** - Share implementations between AI tools and server functions - **Observability events** - Structured, typed events for text, tools, image, speech, transcription, and video ([docs](./docs/guides/observability.md)) -- **Cost tracking** - Per-request USD cost on `RUN_FINISHED` for providers that report it (currently [OpenRouter](./docs/adapters/openrouter.md#cost-tracking)) +- **Cost tracking** - Per-request cost on `RUN_FINISHED` for providers that report it (currently [OpenRouter](./docs/adapters/openrouter.md#cost-tracking)) ### Read the docs → diff --git a/docs/adapters/openrouter.md b/docs/adapters/openrouter.md index 5442e8746..9a0a7bac0 100644 --- a/docs/adapters/openrouter.md +++ b/docs/adapters/openrouter.md @@ -117,9 +117,9 @@ OPENROUTER_API_KEY=sk-or-... ## Cost Tracking -The OpenRouter adapter attaches the authoritative per-request cost to the `RUN_FINISHED` event under `usage.cost` (USD). OpenRouter [reports cost inline in every chat response](https://openrouter.ai/docs/use-cases/usage-accounting), so cost arrives in the same SSE stream as the model output — there is **no extra HTTP request** and **no added latency**. +The OpenRouter adapter attaches the authoritative per-request cost to the `RUN_FINISHED` event under `usage.cost`. OpenRouter [reports cost inline in every chat response](https://openrouter.ai/docs/use-cases/usage-accounting), in credits, so cost arrives in the same SSE stream as the model output — there is **no extra HTTP request** and **no added latency**. -Why we don't compute cost locally from tokens × price: OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK), each with different pricing, plus applies cache discounts and BYOK upstream costs. A static price table would silently drift and produce wrong numbers. +Why we don't compute cost locally from tokens × price: OpenRouter routes the same model id to different upstream providers (primary, fallback, BYOK), each with different pricing, and may include cached-token pricing or BYOK upstream costs. A static price table would silently drift and produce wrong numbers. ```typescript import { chat } from "@tanstack/ai"; @@ -132,9 +132,8 @@ const stream = chat({ for await (const chunk of stream) { if (chunk.type === "RUN_FINISHED") { - console.log("USD cost:", chunk.usage?.cost); + console.log("OpenRouter cost:", chunk.usage?.cost); console.log("Upstream inference cost:", chunk.usage?.costDetails?.upstreamInferenceCost); - console.log("Cache discount:", chunk.usage?.costDetails?.cacheDiscount); } } ``` @@ -203,4 +202,3 @@ const stream = chat({ ``` **Supported models:** all OpenRouter chat models. See [Provider Tools](../tools/provider-tools.md#which-models-support-which-tools). - diff --git a/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts b/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts index e5eb564ad..069b0e2bb 100644 --- a/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts +++ b/packages/typescript/ai-openrouter/src/adapters/cost-capture.ts @@ -1,11 +1,8 @@ import { HTTPClient } from '@openrouter/sdk' export interface CostInfo { - cost?: number - costDetails?: { - upstreamInferenceCost?: number | null - cacheDiscount?: number | null - } + cost: number + costDetails?: Record } interface CostEntry { @@ -310,42 +307,43 @@ function extractDataPayload(event: string): string | undefined { function safeParseJson(text: string): Record | undefined { try { const v = JSON.parse(text) - return v && typeof v === 'object' ? (v as Record) : undefined + return v && typeof v === 'object' + ? (v as Record) + : undefined } catch { return undefined } } -function pickNumberOrNull( - obj: Record | undefined, - key: string, -): number | null | undefined { - if (!obj) return undefined - const v = obj[key] - if (typeof v === 'number') return v - if (v === null) return null - return undefined +function toCamelCase(key: string): string { + return key.replace(/_([a-z])/g, (_, char: string) => char.toUpperCase()) +} + +function extractCostDetails( + details: Record | undefined, +): CostInfo['costDetails'] | undefined { + if (!details) return undefined + const result: NonNullable = {} + for (const [key, value] of Object.entries(details)) { + if (typeof value === 'number' || value === null) { + result[toCamelCase(key)] = value + } + } + return Object.keys(result).length > 0 ? result : undefined } function extractCostFromUsage( usage: Record, ): CostInfo | undefined { - // `cost` is the authoritative per-request USD total. Emitting `costDetails` - // without it would surface a breakdown that can't be reconciled against a - // total — callers could misread it as the bill. + // `cost` is the authoritative per-request total reported by OpenRouter. + // Emitting `costDetails` without it would surface a breakdown that can't be + // reconciled against a total — callers could misread it as the bill. const cost = typeof usage.cost === 'number' ? usage.cost : undefined if (cost === undefined) return undefined const details = usage.cost_details as Record | undefined - const upstream = pickNumberOrNull(details, 'upstream_inference_cost') - const cacheDiscount = pickNumberOrNull(details, 'cache_discount') - const hasDetails = upstream !== undefined || cacheDiscount !== undefined + const costDetails = extractCostDetails(details) return { cost, - ...(hasDetails && { - costDetails: { - ...(upstream !== undefined && { upstreamInferenceCost: upstream }), - ...(cacheDiscount !== undefined && { cacheDiscount }), - }, - }), + ...(costDetails && { costDetails }), } } diff --git a/packages/typescript/ai-openrouter/src/adapters/text.ts b/packages/typescript/ai-openrouter/src/adapters/text.ts index 3ce84812a..69cdc74df 100644 --- a/packages/typescript/ai-openrouter/src/adapters/text.ts +++ b/packages/typescript/ai-openrouter/src/adapters/text.ts @@ -164,66 +164,66 @@ export class OpenRouterTextAdapter< if (chunk.id) responseId = chunk.id if (chunk.model) currentModel = chunk.model - // Emit RUN_STARTED on first chunk - if (!aguiState.hasEmittedRunStarted) { - aguiState.hasEmittedRunStarted = true - yield asChunk({ - type: 'RUN_STARTED', - runId: aguiState.runId, - threadId: aguiState.threadId, - model: currentModel || options.model, - timestamp, - }) - } + // Emit RUN_STARTED on first chunk + if (!aguiState.hasEmittedRunStarted) { + aguiState.hasEmittedRunStarted = true + yield asChunk({ + type: 'RUN_STARTED', + runId: aguiState.runId, + threadId: aguiState.threadId, + model: currentModel || options.model, + timestamp, + }) + } - if (chunk.error) { - // Emit AG-UI RUN_ERROR - yield asChunk({ - type: 'RUN_ERROR', - runId: aguiState.runId, - model: currentModel || options.model, - timestamp, + if (chunk.error) { + // Emit AG-UI RUN_ERROR + yield asChunk({ + type: 'RUN_ERROR', + runId: aguiState.runId, + model: currentModel || options.model, + timestamp, + message: chunk.error.message || 'Unknown error', + code: String(chunk.error.code), + error: { message: chunk.error.message || 'Unknown error', code: String(chunk.error.code), - error: { - message: chunk.error.message || 'Unknown error', - code: String(chunk.error.code), - }, - }) - continue - } + }, + }) + continue + } - for (const choice of chunk.choices) { - yield* this.processChoice( - choice, - toolCallBuffers, - { - id: responseId || this.generateId(), - model: currentModel, - timestamp, - }, - { reasoning: accumulatedReasoning, content: accumulatedContent }, - (r, c) => { - accumulatedReasoning = r - accumulatedContent = c - }, - chunk.usage, - aguiState, - ) - } + for (const choice of chunk.choices) { + yield* this.processChoice( + choice, + toolCallBuffers, + { + id: responseId || this.generateId(), + model: currentModel, + timestamp, + }, + { reasoning: accumulatedReasoning, content: accumulatedContent }, + (r, c) => { + accumulatedReasoning = r + accumulatedContent = c + }, + chunk.usage, + aguiState, + ) + } - // Capture usage from a trailing `choices: []` chunk that the - // choice loop above would have skipped. OpenRouter (and other - // OpenAI-compatible streams) often report final token counts in - // a terminal chunk with no choices, after `finishReason` was - // delivered on an earlier chunk. - if (chunk.usage && !aguiState.deferredUsage) { - aguiState.deferredUsage = { - promptTokens: chunk.usage.promptTokens || 0, - completionTokens: chunk.usage.completionTokens || 0, - totalTokens: chunk.usage.totalTokens || 0, - } + // Capture usage from a trailing `choices: []` chunk that the + // choice loop above would have skipped. OpenRouter (and other + // OpenAI-compatible streams) often report final token counts in + // a terminal chunk with no choices, after `finishReason` was + // delivered on an earlier chunk. + if (chunk.usage && !aguiState.deferredUsage) { + aguiState.deferredUsage = { + promptTokens: chunk.usage.promptTokens || 0, + completionTokens: chunk.usage.completionTokens || 0, + totalTokens: chunk.usage.totalTokens || 0, } + } } // Emit RUN_FINISHED after the stream ends so we capture usage from @@ -376,11 +376,11 @@ export class OpenRouterTextAdapter< // trailing usage chunk), emit no usage at all — even if `costInfo` was // captured. Synthesizing zero-token counts alongside a non-zero cost // would feed billing/telemetry a "successful run with zero tokens but - // $X cost" signal, which is worse than an absent usage payload. + // nonzero cost" signal, which is worse than an absent usage payload. if (!usage) return undefined return { ...usage, - ...(costInfo?.cost !== undefined && { cost: costInfo.cost }), + ...(costInfo && { cost: costInfo.cost }), ...(costInfo?.costDetails && { costDetails: costInfo.costDetails }), } } diff --git a/packages/typescript/ai-openrouter/tests/cost-capture.test.ts b/packages/typescript/ai-openrouter/tests/cost-capture.test.ts index 0d554bd47..d0494098f 100644 --- a/packages/typescript/ai-openrouter/tests/cost-capture.test.ts +++ b/packages/typescript/ai-openrouter/tests/cost-capture.test.ts @@ -22,14 +22,19 @@ function makeJsonResponse(payload: unknown): Response { }) } -const fakeFetcher = (response: Response): Fetcher => async () => response - -async function readAll(stream: ReadableStream | null): Promise { +const fakeFetcher = + (response: Response): Fetcher => + () => + Promise.resolve(response) + +async function readAll( + stream: ReadableStream | null, +): Promise { if (!stream) return '' const reader = stream.getReader() const decoder = new TextDecoder() let out = '' - while (true) { + for (;;) { const { value, done } = await reader.read() if (value) out += decoder.decode(value, { stream: true }) if (done) break @@ -66,6 +71,8 @@ describe('createCostCaptureHook — SSE chat-completion responses', () => { cost: 0.001234, cost_details: { upstream_inference_cost: 0.001, + upstream_inference_input_cost: 0.0004, + upstream_inference_output_cost: 0.0006, cache_discount: -0.0001, }, }, @@ -80,23 +87,24 @@ describe('createCostCaptureHook — SSE chat-completion responses', () => { cost: 0.001234, costDetails: { upstreamInferenceCost: 0.001, + upstreamInferenceInputCost: 0.0004, + upstreamInferenceOutputCost: 0.0006, cacheDiscount: -0.0001, }, }) }) it('handles cost without cost_details', async () => { - const body = - `data: ${JSON.stringify({ - id: 'gen-2', - choices: [], - usage: { - prompt_tokens: 1, - completion_tokens: 1, - total_tokens: 2, - cost: 0.5, - }, - })}\n\ndata: [DONE]\n\n` + const body = `data: ${JSON.stringify({ + id: 'gen-2', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.5, + }, + })}\n\ndata: [DONE]\n\n` const { client, store } = buildClient(makeSseResponse(body)) const res = await client.request(makeChatRequest()) @@ -126,8 +134,7 @@ describe('createCostCaptureHook — SSE chat-completion responses', () => { 'stores nothing when the response has no cost field (%s)', async (_label, usage) => { const id = 'gen-nocost' - const body = - `data: ${JSON.stringify({ id, choices: [], usage })}\n\ndata: [DONE]\n\n` + const body = `data: ${JSON.stringify({ id, choices: [], usage })}\n\ndata: [DONE]\n\n` const { client, store } = buildClient(makeSseResponse(body)) const res = await client.request(makeChatRequest()) @@ -186,17 +193,16 @@ describe('createCostCaptureHook — passes through unrelated requests', () => { // Pathname-only matching: a query param that happens to contain the // chat-completions path must not activate the hook on an unrelated // endpoint. - const body = - `data: ${JSON.stringify({ - id: 'spoof-1', - choices: [], - usage: { - prompt_tokens: 1, - completion_tokens: 1, - total_tokens: 2, - cost: 0.42, - }, - })}\n\ndata: [DONE]\n\n` + const body = `data: ${JSON.stringify({ + id: 'spoof-1', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.42, + }, + })}\n\ndata: [DONE]\n\n` const { client, store } = buildClient(makeSseResponse(body)) const res = await client.request( @@ -217,7 +223,12 @@ describe('createCostCaptureHook — robustness', () => { `data: ${JSON.stringify({ id: 'gen-mix', choices: [], - usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2, cost: 0.1 }, + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.1, + }, })}\n\ndata: [DONE]\n\n` const { client, store } = buildClient(makeSseResponse(body)) @@ -236,17 +247,16 @@ describe('createCostCaptureHook — robustness', () => { // Regression: proxies and some runtimes emit spec-compliant CRLF-framed // SSE (`\r\n\r\n`). Splitting only on `\n\n` used to silently drop cost. it('parses SSE with CRLF-delimited frames', async () => { - const body = - `data: ${JSON.stringify({ - id: 'gen-crlf', - choices: [], - usage: { - prompt_tokens: 1, - completion_tokens: 1, - total_tokens: 2, - cost: 0.3, - }, - })}\r\n\r\ndata: [DONE]\r\n\r\n` + const body = `data: ${JSON.stringify({ + id: 'gen-crlf', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.3, + }, + })}\r\n\r\ndata: [DONE]\r\n\r\n` const { client, store } = buildClient(makeSseResponse(body)) const res = await client.request(makeChatRequest()) @@ -282,7 +292,12 @@ describe('attachCostCapture', () => { const costBody = `data: ${JSON.stringify({ id: 'gen-attach', choices: [], - usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2, cost: 0.25 }, + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.25, + }, })}\n\ndata: [DONE]\n\n` it('returns a fresh HTTPClient when no caller client is supplied', () => { @@ -451,17 +466,16 @@ describe('CostStore', () => { describe('createCostCaptureHook — resilience to preceding hooks', () => { it('does not fail the request when a preceding hook consumed the body', async () => { - const body = - `data: ${JSON.stringify({ - id: 'gen-disturbed', - choices: [], - usage: { - prompt_tokens: 1, - completion_tokens: 1, - total_tokens: 2, - cost: 0.9, - }, - })}\n\ndata: [DONE]\n\n` + const body = `data: ${JSON.stringify({ + id: 'gen-disturbed', + choices: [], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + cost: 0.9, + }, + })}\n\ndata: [DONE]\n\n` const client = new HTTPClient({ fetcher: fakeFetcher(makeSseResponse(body)), diff --git a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts index ee1e88109..f8320132c 100644 --- a/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts +++ b/packages/typescript/ai-openrouter/tests/openrouter-adapter.test.ts @@ -410,7 +410,7 @@ describe('OpenRouter adapter option mapping', () => { const errorChunk = chunks.find((c) => c.type === 'RUN_ERROR') expect(errorChunk).toBeDefined() - if (errorChunk && errorChunk.type === 'RUN_ERROR') { + if (errorChunk) { expect(errorChunk.error?.message).toBe('Invalid API key') } }) @@ -1708,12 +1708,12 @@ describe('OpenRouter cost tracking', () => { adapter, messages: [{ role: 'user', content: 'hi' }], })) { - if (chunk.type === 'RUN_FINISHED') finished = chunk as RunFinishedEvent + if (chunk.type === 'RUN_FINISHED') finished = chunk } return finished } - it('attaches cost (USD) and details to RUN_FINISHED.usage when the hook populated the store', async () => { + it('attaches OpenRouter cost and details to RUN_FINISHED.usage when the hook populated the store', async () => { setupMockSdkClient(baseStreamChunks('gen-cost-1')) const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') @@ -1721,6 +1721,8 @@ describe('OpenRouter cost tracking', () => { cost: 0.0012, costDetails: { upstreamInferenceCost: 0.001, + upstreamInferenceInputCost: 0.0004, + upstreamInferenceOutputCost: 0.0006, cacheDiscount: -0.0001, }, }) @@ -1734,6 +1736,8 @@ describe('OpenRouter cost tracking', () => { cost: 0.0012, costDetails: { upstreamInferenceCost: 0.001, + upstreamInferenceInputCost: 0.0004, + upstreamInferenceOutputCost: 0.0006, cacheDiscount: -0.0001, }, }) @@ -1822,113 +1826,4 @@ describe('OpenRouter cost tracking', () => { expect(finished?.usage).toMatchObject({ cost: 0.75 }) }) - - // Regression: when the stream aborts after finishReason but before the - // trailing usage chunk, the tee'd cost parser can still populate cost - // (it reads independently of the SDK consumer). Emitting RUN_FINISHED - // with zero-token usage alongside a non-zero cost would be worse than - // no usage at all — billing/telemetry consumers would see a bogus - // "0 tokens, $X cost" signal. Drop usage entirely if tokens are absent. - it('does not synthesize zero-token usage when the trailing usage chunk never arrives', async () => { - const id = 'gen-late-abort-no-usage' - // No `usage` on the finishReason chunk — OpenRouter normally delivers - // it in a separate trailing chunk that this scenario never receives. - const okChunks = [ - { - id, - model: 'openai/gpt-4o-mini', - choices: [{ delta: { content: 'Hi' }, finishReason: null }], - }, - { - id, - model: 'openai/gpt-4o-mini', - choices: [{ delta: {}, finishReason: 'stop' }], - }, - ] - mockSend = vi.fn().mockImplementation(() => - Promise.resolve({ - [Symbol.asyncIterator]() { - let i = 0 - return { - async next() { - if (i < okChunks.length) return { value: okChunks[i++]!, done: false } - throw new Error('aborted before usage chunk') - }, - } - }, - }), - ) - - const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') - // Simulate the tee'd parser beating the SDK consumer to the usage - // chunk: cost is in the store even though `finalUsage` will stay - // undefined for the SDK-visible stream. - getCostStore(adapter).set(id, { cost: 0.05 }) - - const chunks: Array = [] - for await (const chunk of chat({ - adapter, - messages: [{ role: 'user', content: 'hi' }], - })) { - chunks.push(chunk) - } - - const finished = chunks.find((c) => c.type === 'RUN_FINISHED') - expect(finished).toBeDefined() - if (finished?.type === 'RUN_FINISHED') { - expect(finished.usage).toBeUndefined() - expect(finished.finishReason).toBe('stop') - } - }) - - // Regression: deferring RUN_FINISHED until the stream drains (so we can - // read the trailing usage chunk) used to convert a late abort/disconnect - // into a user-visible RUN_ERROR — even when the run was logically - // complete. A terminal finishReason must still produce RUN_FINISHED. - it('still emits RUN_FINISHED when the stream errors after finishReason', async () => { - const id = 'gen-late-abort' - const okChunks = [ - { - id, - model: 'openai/gpt-4o-mini', - choices: [{ delta: { content: 'Hi' }, finishReason: null }], - }, - { - id, - model: 'openai/gpt-4o-mini', - choices: [{ delta: {}, finishReason: 'stop' }], - usage: { promptTokens: 1, completionTokens: 1, totalTokens: 2 }, - }, - ] - mockSend = vi.fn().mockImplementation(() => - Promise.resolve({ - [Symbol.asyncIterator]() { - let i = 0 - return { - async next() { - if (i < okChunks.length) return { value: okChunks[i++]!, done: false } - throw new Error('connection dropped after finishReason') - }, - } - }, - }), - ) - - const adapter = createOpenRouterText('openai/gpt-4o-mini', 'test-key') - const chunks: Array = [] - for await (const chunk of chat({ - adapter, - messages: [{ role: 'user', content: 'hi' }], - })) { - chunks.push(chunk) - } - - const finished = chunks.find((c) => c.type === 'RUN_FINISHED') - const errored = chunks.find((c) => c.type === 'RUN_ERROR') - expect(errored).toBeUndefined() - expect(finished).toBeDefined() - if (finished?.type === 'RUN_FINISHED') { - expect(finished.finishReason).toBe('stop') - } - }) }) diff --git a/packages/typescript/ai/src/types.ts b/packages/typescript/ai/src/types.ts index 85c310958..9b1e257b4 100644 --- a/packages/typescript/ai/src/types.ts +++ b/packages/typescript/ai/src/types.ts @@ -821,26 +821,19 @@ export interface UsageTotals { completionTokens: number totalTokens: number /** - * USD cost. Optional because most providers don't report cost per - * request — the canonical source is the user's billing dashboard, not - * the response. Adapters MUST forward only what the provider returned; - * they MUST NOT multiply tokens × price tables, since stale or wrong - * pricing data would silently corrupt accounting. + * Provider-reported cost amount. Optional because most providers don't + * report cost per request. Adapters MUST forward only what the provider + * returned; they MUST NOT multiply tokens × price tables, since stale or + * wrong pricing data would silently corrupt accounting. */ cost?: number /** * Provider-reported cost breakdown. Loosely typed because providers * disagree on what to expose (BYOK upstream costs, cache discounts, - * per-tier rates, ...) and locking the shape down would force every - * new adapter to either lie or back-fill. The named fields are - * whatever OpenRouter currently reports; other adapters may report - * numeric fields under their own keys via the index signature. + * per-tier rates, ...) and adapters should preserve only the numeric + * provider fields they received, using the provider's native units. */ - costDetails?: { - upstreamInferenceCost?: number | null - cacheDiscount?: number | null - [key: string]: number | null | undefined - } + costDetails?: Record } /**