|
- {run.runId}
+
+ {run.runId}
+
|
@@ -666,6 +705,7 @@ function OverviewView({ request }: { request: ServerRequest }) {
function RunsView({ request }: { request: ServerRequest }) {
const [data, setData] = useState(null);
const [error, setError] = useState(null);
+ const [filter, setFilter] = useState("");
useEffect(() => {
let cancelled = false;
@@ -684,13 +724,55 @@ function RunsView({ request }: { request: ServerRequest }) {
};
}, [request]);
- if (error) return ;
- if (!data) return ;
+ const filterLower = filter.trim().toLowerCase();
+ const filteredRuns = useMemo(() => {
+ if (!data) return [] as RunSummary[];
+ if (filterLower === "") return data.runs;
+ return data.runs.filter((run) => {
+ const haystack = [
+ run.runId,
+ run.preset ?? "",
+ run.label ?? "",
+ run.status,
+ ]
+ .join(" ")
+ .toLowerCase();
+ return haystack.includes(filterLower);
+ });
+ }, [data, filterLower]);
return (
<>
Runs
-
+
+
+ setFilter(event.currentTarget.value)}
+ placeholder="Filter by id, preset, label, status ( / )"
+ aria-label="Filter runs"
+ />
+
+ {error ? (
+
+ ) : !data ? (
+
+ ) : data.runs.length === 0 ? (
+
+ No runs recorded yet.
+
+ ) : filteredRuns.length === 0 ? (
+
+ No runs match "{filter}".
+
+ ) : (
+
+ )}
>
);
}
@@ -975,49 +1057,61 @@ function SuitesView({ request }: { request: ServerRequest }) {
/>
)}
Suites
-
-
-
- | Suite |
- Schema |
- Path |
- Objects |
-
-
-
- {suites.suites.map((suite) => (
-
- | {suite.id} |
- {suite.schema} |
- {suite.relativePath} |
- {suite.objectCount} |
+ {suites.suites.length === 0 ? (
+
+ No suites discovered under this data root.
+
+ ) : (
+
+
+
+ | Suite |
+ Schema |
+ Path |
+ Objects |
- ))}
-
-
+
+
+ {suites.suites.map((suite) => (
+
+ | {suite.id} |
+ {suite.schema} |
+ {suite.relativePath} |
+ {suite.objectCount} |
+
+ ))}
+
+
+ )}
Scenarios
-
-
-
- | Scenario |
- Name |
- Suite |
- Tags |
- Rubric |
-
-
-
- {scenarios.scenarios.map((scenario) => (
-
- | {scenario.id} |
- {scenario.name} |
- {scenario.suiteId} |
- {scenario.tags.join(", ") || "-"} |
- {scenario.rubric ?? "-"} |
+ {scenarios.scenarios.length === 0 ? (
+
+ No scenarios discovered.
+
+ ) : (
+
+
+
+ | Scenario |
+ Name |
+ Suite |
+ Tags |
+ Rubric |
- ))}
-
-
+
+
+ {scenarios.scenarios.map((scenario) => (
+
+ | {scenario.id} |
+ {scenario.name} |
+ {scenario.suiteId} |
+ {scenario.tags.join(", ") || "-"} |
+ {scenario.rubric ?? "-"} |
+
+ ))}
+
+
+ )}
>
);
}
@@ -1639,6 +1733,55 @@ function ServerDashboard() {
useLocalLinkInterception(navigate);
+ const shortcuts = useMemo(
+ () => [
+ {
+ key: "/",
+ description: "Focus search input",
+ run: () => {
+ const search = document.querySelector(
+ '[data-keynav-search="true"], #runs-search',
+ );
+ if (search) {
+ search.focus();
+ search.select?.();
+ }
+ },
+ },
+ {
+ key: "j",
+ description: "Move selection down in list",
+ run: () => moveKeynavRow(1),
+ },
+ {
+ key: "k",
+ description: "Move selection up in list",
+ run: () => moveKeynavRow(-1),
+ },
+ {
+ sequence: ["g", "r"],
+ key: "r",
+ description: "Go to Runs",
+ run: () => navigate("/runs"),
+ },
+ {
+ sequence: ["g", "p"],
+ key: "p",
+ description: "Go to Presets",
+ run: () => navigate("/presets"),
+ },
+ {
+ sequence: ["g", "s"],
+ key: "s",
+ description: "Go to Start run",
+ run: () => navigate("/start"),
+ },
+ ],
+ [navigate],
+ );
+
+ useKeyboardShortcuts({ shortcuts });
+
const onTokenChange = useCallback((nextToken: string) => {
writeStoredToken(nextToken);
setToken(nextToken);
diff --git a/dashboard/src/hooks/useKeyboardShortcuts.ts b/dashboard/src/hooks/useKeyboardShortcuts.ts
new file mode 100644
index 0000000..7aba3a8
--- /dev/null
+++ b/dashboard/src/hooks/useKeyboardShortcuts.ts
@@ -0,0 +1,121 @@
+import { useEffect, useRef } from "react";
+
+export type KeyboardShortcut = {
+ key: string;
+ sequence?: string[];
+ description: string;
+ run: (event: KeyboardEvent) => void;
+};
+
+export type KeyboardShortcutHandlerOptions = {
+ shortcuts: KeyboardShortcut[];
+ isEnabled?: () => boolean;
+ sequenceTimeoutMs?: number;
+};
+
+const DEFAULT_SEQUENCE_TIMEOUT_MS = 800;
+
+export function shouldIgnoreKeyboardEvent(target: EventTarget | null): boolean {
+ if (!target || !(target instanceof Element)) {
+ return false;
+ }
+ const tag = target.tagName;
+ if (tag === "INPUT" || tag === "TEXTAREA" || tag === "SELECT") {
+ return true;
+ }
+ if (tag === "BUTTON") {
+ return false;
+ }
+ const htmlEl = target as HTMLElement;
+ if (htmlEl.isContentEditable) {
+ return true;
+ }
+ return false;
+}
+
+export function createKeyboardDispatcher(
+ options: KeyboardShortcutHandlerOptions,
+): (event: KeyboardEvent) => void {
+ const { shortcuts, isEnabled } = options;
+ const timeoutMs = options.sequenceTimeoutMs ?? DEFAULT_SEQUENCE_TIMEOUT_MS;
+ const state = { pending: [] as string[], lastAt: 0 };
+
+ return (event: KeyboardEvent) => {
+ if (event.defaultPrevented) return;
+ if (event.ctrlKey || event.metaKey || event.altKey) return;
+ if (isEnabled && !isEnabled()) return;
+ if (shouldIgnoreKeyboardEvent(event.target)) return;
+
+ const now = Date.now();
+ if (now - state.lastAt > timeoutMs) {
+ state.pending = [];
+ }
+ state.lastAt = now;
+
+ const currentKey = event.key;
+ const nextSequence = [...state.pending, currentKey];
+
+ const sequenceMatch = shortcuts.find(
+ (shortcut) =>
+ shortcut.sequence !== undefined &&
+ shortcut.sequence.length === nextSequence.length &&
+ shortcut.sequence.every((key, idx) => key === nextSequence[idx]),
+ );
+ if (sequenceMatch) {
+ state.pending = [];
+ event.preventDefault();
+ sequenceMatch.run(event);
+ return;
+ }
+
+ const hasSequencePrefix = shortcuts.some(
+ (shortcut) =>
+ shortcut.sequence !== undefined &&
+ shortcut.sequence.length > nextSequence.length &&
+ shortcut.sequence
+ .slice(0, nextSequence.length)
+ .every((key, idx) => key === nextSequence[idx]),
+ );
+
+ if (hasSequencePrefix) {
+ state.pending = nextSequence;
+ event.preventDefault();
+ return;
+ }
+
+ const singleMatch = shortcuts.find(
+ (shortcut) =>
+ shortcut.sequence === undefined && shortcut.key === currentKey,
+ );
+ if (singleMatch) {
+ state.pending = [];
+ event.preventDefault();
+ singleMatch.run(event);
+ return;
+ }
+
+ state.pending = [];
+ };
+}
+
+export function useKeyboardShortcuts(
+ options: KeyboardShortcutHandlerOptions,
+): void {
+ const optionsRef = useRef(options);
+ optionsRef.current = options;
+ useEffect(() => {
+ const dispatcherRef: { current: (event: KeyboardEvent) => void } = {
+ current: createKeyboardDispatcher(optionsRef.current),
+ };
+ let previousShortcuts = optionsRef.current.shortcuts;
+ const handler = (event: KeyboardEvent): void => {
+ if (previousShortcuts !== optionsRef.current.shortcuts) {
+ dispatcherRef.current = createKeyboardDispatcher(optionsRef.current);
+ previousShortcuts = optionsRef.current.shortcuts;
+ }
+ dispatcherRef.current(event);
+ };
+ window.addEventListener("keydown", handler);
+ return () => window.removeEventListener("keydown", handler);
+ }, []);
+}
diff --git a/dashboard/src/styles.css b/dashboard/src/styles.css
index 4b069af..d8010f1 100644
--- a/dashboard/src/styles.css
+++ b/dashboard/src/styles.css
@@ -103,6 +103,43 @@ body {
padding: 16px;
color: var(--muted);
}
+.server-filter-row {
+ display: flex;
+ gap: 8px;
+ align-items: center;
+ margin: 8px 0 12px;
+}
+.server-filter-input {
+ flex: 1;
+ min-width: 200px;
+ background: var(--bg);
+ border: 1px solid var(--border);
+ border-radius: 6px;
+ color: var(--text);
+ padding: 8px 10px;
+}
+.server-filter-input:focus {
+ outline: 2px solid var(--indigo);
+ outline-offset: 1px;
+}
+.sr-only {
+ position: absolute;
+ width: 1px;
+ height: 1px;
+ margin: -1px;
+ border: 0;
+ padding: 0;
+ overflow: hidden;
+ clip: rect(0 0 0 0);
+ clip-path: inset(50%);
+ white-space: nowrap;
+}
+[data-keynav="row"]:focus-within,
+[data-keynav="row"]:focus,
+[data-keynav-link="true"]:focus {
+ outline: 2px solid var(--indigo);
+ outline-offset: -2px;
+}
.server-error {
color: var(--amber);
border-color: rgba(245, 158, 11, 0.35);
diff --git a/docs/RELIABILITY.md b/docs/RELIABILITY.md
index d01322c..3630e4b 100644
--- a/docs/RELIABILITY.md
+++ b/docs/RELIABILITY.md
@@ -40,3 +40,69 @@ The initial repository-level budgets are:
- Include the smallest stable identifiers needed for tracing a failure.
- Log enough context to reproduce the path, but not guessed or unvalidated
payloads.
+
+## Server metrics, spans, and budgets (Phase 4)
+
+The AgentProbe server ships a narrow in-process metrics registry and span
+recorder so operators can introspect a running server without depending on an
+external collector. All adapters live under
+`src/runtime/server/observability/`.
+
+### Shipped counters
+
+| Name | Labels | Purpose |
+| --- | --- | --- |
+| `server.http.requests` | `method`, `route`, `status` | Per-request volume by outcome. |
+| `server.runs.started_total` | `preset` | Runs accepted by the run controller. |
+| `server.runs.finished_total` | `preset` | Runs that reached a terminal state. |
+
+### Shipped gauges
+
+| Name | Purpose |
+| --- | --- |
+| `server.runs.active` | Active runs tracked by the controller. |
+| `server.sse.connections` | Open SSE subscribers across all runs. |
+
+### Shipped spans
+
+| Name | Where | Purpose |
+| --- | --- | --- |
+| `server.run.start.validation` | `RunController.start` | Validates OpenRouter configuration and suite conflicts. |
+| `server.run.controller.execute` | `RunController.execute` | End-to-end run execution wrapper. |
+| `server.run.suite.boot` | `RunController.execute` | Time from controller accept to first suite/scenario event. |
+
+### Latency budgets
+
+Run `bun run latency-budget --samples 25` to populate these numbers against
+seeded local data. Budgets are `p95` unless noted. CI is expected to stay
+well below the budget on loopback; degraded values should be investigated
+before shipping.
+
+| Surface | Budget (p95) |
+| --- | --- |
+| `GET /` (dashboard index) | 150 ms |
+| `GET /api/runs` | 150 ms |
+| `POST /api/runs` (validation rejection) | 200 ms |
+| SSE first-event latency | 200 ms |
+
+### SSE hardening contract
+
+- Every SSE response emits `retry: 2000` on connect and periodic heartbeat
+ comments every 15 seconds.
+- Terminal events (`run_finished`, `run_cancelled`, `run_failed`) are emitted
+ exactly once per run and close the stream after dispatch.
+- `Last-Event-ID` is honored from both the standard `Last-Event-ID` header and
+ the `last_event_id` query parameter.
+- Historical runs resolve terminal state on replay even when the ring buffer
+ has been dropped.
+- Proxy-safe headers (`cache-control: no-store, no-transform`,
+ `x-accel-buffering: no`, `connection: keep-alive`) are set on every stream
+ response.
+
+### Soak harness
+
+`bun run soak --duration-ms 10000 --runs 50 --sse-connections 3` is the fast
+CI mode: it verifies that no active runs, no stuck streams, and no request
+failures remain at shutdown. The `--manual` flag extends the defaults to a
+~1h soak and emits the run/failure/RSS/event-lag/latency/connection summary
+line for PR evidence.
diff --git a/docs/playbooks/agent-probe-server.md b/docs/playbooks/agent-probe-server.md
index 5e37715..7ecaac8 100644
--- a/docs/playbooks/agent-probe-server.md
+++ b/docs/playbooks/agent-probe-server.md
@@ -308,3 +308,120 @@ The HTTP endpoint:
The dashboard reads `?run_ids=a,b[&only=changes]` so shared deep links survive
refresh, and the preset detail view surfaces a "Compare last two runs" CTA
that pre-selects the two most recent runs for the preset.
+
+## Phase 4: Observability, SSE Hardening, and Operational Polish
+
+### Tracing a request by id
+
+Every HTTP response carries an `x-request-id` header (injected from the
+incoming `x-request-id` when present, generated otherwise). Structured logs
+emit `request_id` on the same line as `method`, `path`, `route`, `status`, and
+`duration_ms`, so an operator can pipe a request id through `grep` to find the
+single lifecycle of a request.
+
+```bash
+curl -fsS \
+ -H "Authorization: Bearer $AGENTPROBE_SERVER_TOKEN" \
+ -H "x-request-id: incident-2026-04-17-01" \
+ http://127.0.0.1:7878/api/runs | cat
+grep incident-2026-04-17-01 server.log
+```
+
+Set `AGENTPROBE_SERVER_LOG_FORMAT=json` (or pass `--log-format json`) to emit
+JSON lines for stream processors. Text remains the default for interactive
+bring-up.
+
+### Metrics, spans, and startup log
+
+- `docs/RELIABILITY.md#server-metrics-spans-and-budgets-phase-4` lists every
+ counter, gauge, and span name the server emits along with the expected
+ labels. Adapters are in-process only — no external collector is required.
+- The server's first log line (`server.startup`) includes the redacted config
+ summary. Tokens and database passwords are replaced with
+ `[redacted]:c`. Use it to confirm bind host, port, CORS origins, and
+ backend without leaking secrets.
+
+### Latency budgets
+
+`bun run latency-budget --samples 25` boots the server against a synthetic data
+root, samples the indexed surfaces, and prints p50/p95/p99 per surface. Use
+`--report-only` to log without failing the process when you are diagnosing
+rather than gating. Shipping budgets live in `docs/RELIABILITY.md`.
+
+### Soak harness
+
+- CI mode: `bun run soak --duration-ms 10000 --runs 50 --sse-connections 3`
+ proves that no active runs, no stuck streams, and no request failures
+ remain when the server stops.
+- Manual mode: `bun run soak --manual` targets about one hour, repeatedly
+ launches synthetic runs, reconnects SSE streams, and browses history. The
+ emitted JSON summary (runs, failures, RSS trend, event lag, request
+ latency, open connections at shutdown) is the PR evidence artifact.
+
+### SSE proxying notes
+
+AgentProbe's SSE endpoint is designed to be safe behind standard reverse
+proxies:
+
+- Every response sets `cache-control: no-store, no-transform`,
+ `x-accel-buffering: no`, and `connection: keep-alive`.
+- A `retry: 2000` directive is emitted on every reconnect so misbehaving
+ client libraries still back off.
+- Heartbeat comments flow every 15 seconds on idle streams so NAT and idle
+ timeouts do not silently sever the connection.
+
+Minimum nginx snippet for a reverse proxy that preserves the contract:
+
+```nginx
+location /api/runs/ {
+ proxy_http_version 1.1;
+ proxy_set_header Connection "";
+ proxy_buffering off;
+ proxy_cache off;
+ proxy_read_timeout 1h;
+ chunked_transfer_encoding on;
+ proxy_pass http://agentprobe/api/runs/;
+}
+```
+
+`proxy_buffering off` and the request header `Connection ""` are load-bearing:
+nginx's default buffered mode will starve the browser's EventSource until the
+buffer fills.
+
+### Backup, restore, and migration recovery
+
+- SQLite: `sqlite3 .agentprobe/runs.sqlite3 ".backup backup.sqlite3"` while the
+ server runs. Restore by stopping the server, copying the backup into place,
+ and starting again.
+- Postgres: use `pg_dump -Fc` daily and store dumps off-host. Restore with
+ `pg_restore -d agentprobe backup.dump` after stopping writers.
+- Migration failure: `agentprobe db:migrate` applies schema changes. On
+ failure, capture the CLI output, roll back to the previous dump, and
+ rerun the migration after addressing the root cause. Phase 4 does not
+ change the migration surface; see the Phase 3 section above for the Postgres
+ boot gate behaviour.
+
+### Dashboard cache behaviour
+
+- `/healthz`, `/readyz`, `/api/session`, `/api/runs`, `/api/runs/:id`, and
+ SSE responses all set `cache-control: no-store` (or equivalent) so
+ reverse proxies never serve stale state to operators.
+- Static assets under the dashboard bundle (`/*.js`, `/*.css`) inherit the
+ Bun static file cache and may be cached on the CDN. Rebuild the dashboard
+ bundle (`bun run dashboard:build`) to invalidate fingerprinted asset URLs.
+- Local storage holds the bearer token under
+ `agentprobe:server-token`; clear it with the Settings view when rotating.
+
+### Dashboard keyboard shortcuts
+
+| Shortcut | Action |
+| --- | --- |
+| `/` | Focus the runs-page search input. |
+| `j` / `k` | Move focus to the next / previous list row. |
+| `g r` | Navigate to Runs. |
+| `g p` | Navigate to Presets. |
+| `g s` | Navigate to Start run. |
+
+Shortcuts are suppressed while typing in `INPUT`, `TEXTAREA`, `SELECT`, or a
+`contenteditable` element, and while any of `Ctrl`, `Meta`, or `Alt` is held.
+Every shortcut-backed action still has a visible nav link or button.
diff --git a/docs/product-specs/current-state.md b/docs/product-specs/current-state.md
index 25571fd..8a2f390 100644
--- a/docs/product-specs/current-state.md
+++ b/docs/product-specs/current-state.md
@@ -35,6 +35,13 @@ Last validated against `platform.md`: 2026-04-17
- [x] Docker image boots safely with SQLite-on-volume persistence
- [x] Database URL credentials stay redacted in operator-visible output
- [x] Docker Compose readiness waits for server readiness
+- [x] Server observability adapters emit counters, gauges, and spans without an external collector
+- [x] Startup and per-request logs redact secrets and preserve request IDs
+- [x] SSE streams emit heartbeats, retry hints, and exactly-once terminal events
+- [x] Dashboard keyboard shortcuts coexist with form typing
+- [x] Dashboard views render empty, error, and loading states for every major surface
+- [x] Latency budget checks run deterministically against seeded local data
+- [x] Soak harness produces CI and manual evidence with the required summary
## Notes
diff --git a/docs/product-specs/e2e-checklist.md b/docs/product-specs/e2e-checklist.md
index e621204..b6d27e9 100644
--- a/docs/product-specs/e2e-checklist.md
+++ b/docs/product-specs/e2e-checklist.md
@@ -32,3 +32,10 @@ Derived from `platform.md`. Every scenario should have a coverage owner.
| Docker image boots safely with SQLite-on-volume persistence | `Dockerfile` + `docker-compose.yml` + `docs/playbooks/agent-probe-server.md` | ✅ covered |
| Database URL credentials stay redacted in operator-visible output | `tests/unit/persistence/url.test.ts` + `tests/unit/server/config.test.ts` | ✅ covered |
| Docker Compose readiness waits for server readiness | `docker-compose.yml` + `docs/playbooks/agent-probe-server.md` + `docker compose config` | ✅ covered |
+| Server observability adapters emit counters, gauges, and spans without an external collector | `tests/unit/server/observability/metrics.test.ts` + `tests/unit/server/observability/spans.test.ts` + `tests/integration/server/sse-reconnect.test.ts` | ✅ covered |
+| Startup and per-request logs redact secrets and preserve request IDs | `tests/unit/server/observability/redaction.test.ts` + `tests/unit/server/observability/logger.test.ts` | ✅ covered |
+| SSE streams emit heartbeats, retry hints, and exactly-once terminal events | `tests/integration/server/sse-reconnect.test.ts` + `tests/unit/server/streams.test.ts` | ✅ covered |
+| Dashboard keyboard shortcuts coexist with form typing | `tests/unit/dashboard/keyboard-shortcuts.test.tsx` | ✅ covered |
+| Dashboard views render empty, error, and loading states for every major surface | `tests/unit/dashboard-app.test.tsx` + `tests/unit/dashboard/compare-view.test.tsx` + manual dashboard pass | ⏳ unit covered; browser pass logged on PR |
+| Latency budget checks run deterministically against seeded local data | `scripts/latency-budget.ts` + `docs/RELIABILITY.md` | ✅ covered |
+| Soak harness produces CI and manual evidence with the required summary | `scripts/soak.ts` + `docs/RELIABILITY.md` + `docs/playbooks/agent-probe-server.md` | ✅ covered |
diff --git a/docs/product-specs/platform.md b/docs/product-specs/platform.md
index aa12328..f185ced 100644
--- a/docs/product-specs/platform.md
+++ b/docs/product-specs/platform.md
@@ -292,3 +292,86 @@ configuration errors that include the database URL
**Then** the output redacts the password component for any URL scheme that
contains credentials, including percent-encoded and reserved password
characters, and never exposes the raw configured password.
+
+### Server observability adapters emit counters, gauges, and spans without an external collector
+
+**Given** an `agentprobe start-server` instance running without any external
+metrics collector configured
+**When** the server handles HTTP traffic, accepts runs, and streams SSE events
+**Then** the in-process registry emits `server.http.requests`,
+`server.runs.started_total`, and `server.runs.finished_total` counters (with
+`method`, `route`, `status`, and `preset` labels where applicable), updates the
+`server.runs.active` and `server.sse.connections` gauges as lifecycle events
+occur, records `server.run.start.validation`, `server.run.controller.execute`,
+and `server.run.suite.boot` spans, and exposes a snapshot API for tests and
+operators without requiring a collector to be installed.
+
+### Startup and per-request logs redact secrets and preserve request IDs
+
+**Given** an `agentprobe start-server` instance configured with a bearer token
+and a database URL that contains credentials
+**When** the server logs its startup configuration, an HTTP request, or a run
+controller lifecycle event
+**Then** the startup log emits a single `server.startup` line that masks
+token-like fields and redacts userinfo credentials, every request log carries
+`method`, `route`, `status`, `duration_ms`, and `request_id`, and run
+controller logs tag each event with `run_id` and `preset_id` so a single
+request or run can be traced through the pipeline by ID.
+
+### SSE streams emit heartbeats, retry hints, and exactly-once terminal events
+
+**Given** a client subscribed to `GET /api/runs/:runId/events` for an active or
+historical run
+**When** the server accepts the connection and the run proceeds to a terminal
+state
+**Then** the server emits a `retry:` directive on connect, periodic heartbeat
+comments on idle streams, replays buffered events after any `Last-Event-ID`
+supplied via the standard header or the `last_event_id` query parameter, sets
+`cache-control: no-store, no-transform`, `x-accel-buffering: no`, and
+`connection: keep-alive` on every response, and emits exactly one terminal
+event (`run_finished`, `run_cancelled`, or `run_failed`) before closing the
+stream even when replaying a historical run whose ring buffer has been
+evicted.
+
+### Dashboard keyboard shortcuts coexist with form typing
+
+**Given** an operator on any dashboard page
+**When** they press `/`, `j`, `k`, or the `g r` / `g p` / `g s` chord
+**Then** the browser focuses the runs search input, moves the keyboard focus
+between list rows, or navigates to the Runs, Presets, or Start-run routes
+respectively; shortcuts are ignored while typing in `INPUT`, `TEXTAREA`,
+`SELECT`, or a `contenteditable` element, ignored when any of `Ctrl`, `Meta`,
+or `Alt` is held, and every shortcut-backed action remains reachable through a
+visible nav link or button.
+
+### Dashboard views render empty, error, and loading states for every major surface
+
+**Given** a dashboard view loading data from the server
+**When** the view is awaiting a response, receives an empty result set,
+receives a filter that does not match any rows, or encounters an HTTP error
+**Then** the runs, presets, compare, settings, suites, and auth surfaces all
+render a dedicated empty, error, or loading affordance rather than a blank
+layout, and the runs page provides a filter input that collapses to an empty
+state explaining that the current filter term returned no rows.
+
+### Latency budget checks run deterministically against seeded local data
+
+**Given** a developer or CI job running `bun run latency-budget`
+**When** the harness boots a loopback server against a synthetic data root and
+samples `GET /`, `GET /api/runs`, `POST /api/runs`, and SSE first-event
+latencies
+**Then** the harness prints per-surface p50/p95/p99 values, compares them
+against the budgets recorded in `docs/RELIABILITY.md`, and exits non-zero when
+any p95 exceeds its budget unless `--report-only` is set.
+
+### Soak harness produces CI and manual evidence with the required summary
+
+**Given** a developer or CI job running `bun run soak`
+**When** the harness executes in default CI mode or with `--manual` for a
+longer window
+**Then** the harness repeatedly starts synthetic runs, reconnects SSE streams,
+and browses history; the CI run verifies that no active runs, no stuck
+streams, and no HTTP failures remain at shutdown; and every mode emits a
+JSON summary with run count, failures, RSS start/end samples, event lag,
+request latency p95, and open-connection counts at shutdown that can be
+attached to PR evidence.
diff --git a/package.json b/package.json
index 0af0131..e51b1f4 100644
--- a/package.json
+++ b/package.json
@@ -20,7 +20,9 @@
"typecheck": "bunx tsc --noEmit && bun run --cwd dashboard typecheck",
"dashboard:dev": "bun run --cwd dashboard dev",
"dashboard:build": "bun run --cwd dashboard build",
- "validate:setup": "./scripts/validate-setup.sh"
+ "validate:setup": "./scripts/validate-setup.sh",
+ "soak": "bun scripts/soak.ts",
+ "latency-budget": "bun scripts/latency-budget.ts"
},
"dependencies": {
"nunjucks": "^3.2.4",
diff --git a/scripts/latency-budget.ts b/scripts/latency-budget.ts
new file mode 100644
index 0000000..3eb23ef
--- /dev/null
+++ b/scripts/latency-budget.ts
@@ -0,0 +1,205 @@
+#!/usr/bin/env bun
+/* eslint-disable no-console */
+import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+
+import {
+ type StartedServer,
+ startAgentProbeServer,
+} from "../src/runtime/server/app-server.ts";
+import { buildServerConfig } from "../src/runtime/server/config.ts";
+
+type SampleKind =
+ | "static.index"
+ | "api.runs.list"
+ | "api.runs.create"
+ | "sse.first_event";
+
+type Budget = {
+ kind: SampleKind;
+ label: string;
+ p95Ms: number;
+};
+
+const BUDGETS: Budget[] = [
+ { kind: "static.index", label: "GET / (index.html)", p95Ms: 150 },
+ { kind: "api.runs.list", label: "GET /api/runs", p95Ms: 150 },
+ {
+ kind: "api.runs.create",
+ label: "POST /api/runs (validation error)",
+ p95Ms: 200,
+ },
+ {
+ kind: "sse.first_event",
+ label: "SSE /api/runs/:id/events first byte",
+ p95Ms: 200,
+ },
+];
+
+function writeMinimalSuite(root: string): string {
+ const data = join(root, "data");
+ mkdirSync(data, { recursive: true });
+ writeFileSync(
+ join(data, "endpoint.yaml"),
+ [
+ "transport: http",
+ "connection:",
+ " base_url: http://example.test",
+ "request:",
+ " method: POST",
+ ' url: "{{ base_url }}/chat"',
+ " body_template: '{}'",
+ "response:",
+ " format: text",
+ ' content_path: "$"',
+ "",
+ ].join("\n"),
+ "utf8",
+ );
+ return data;
+}
+
+function parseFlag(args: string[], name: string, fallback: number): number {
+ const index = args.indexOf(name);
+ if (index === -1) return fallback;
+ const raw = args[index + 1];
+ const parsed = Number(raw);
+ if (!Number.isFinite(parsed) || parsed <= 0) return fallback;
+ return parsed;
+}
+
+function percentile(values: number[], pct: number): number {
+ if (values.length === 0) return 0;
+ const sorted = [...values].sort((a, b) => a - b);
+ const rank = Math.min(sorted.length - 1, Math.ceil(sorted.length * pct) - 1);
+ return sorted[Math.max(0, rank)] ?? 0;
+}
+
+async function timeRequest(
+ fn: () => Promise,
+ samples: number,
+): Promise {
+ const results: number[] = [];
+ for (let i = 0; i < samples; i++) {
+ const t0 = performance.now();
+ try {
+ await fn();
+ } catch {
+ // Errors are expected for some paths (e.g., validation-error runs).
+ }
+ results.push(performance.now() - t0);
+ }
+ return results;
+}
+
+async function measureSseFirstEvent(
+ server: StartedServer,
+ samples: number,
+): Promise {
+ const results: number[] = [];
+ for (let i = 0; i < samples; i++) {
+ const runId = `latency-sse-${i}`;
+ server.streamHub.publish({
+ runId,
+ kind: "run_progress",
+ payload: { kind: "scenario_started" },
+ });
+ const t0 = performance.now();
+ const response = await fetch(`${server.url}/api/runs/${runId}/events`);
+ const reader = response.body?.getReader();
+ if (!reader) continue;
+ await reader.read();
+ const duration = performance.now() - t0;
+ await reader.cancel();
+ results.push(duration);
+ }
+ return results;
+}
+
+async function main(): Promise {
+ const args = process.argv.slice(2);
+ const samples = parseFlag(args, "--samples", 20);
+ const strict = !args.includes("--report-only");
+
+ const root = mkdtempSync(join(tmpdir(), "agentprobe-latency-"));
+ const dataPath = writeMinimalSuite(root);
+ const dbPath = join(root, "runs.sqlite3");
+
+ const server = await startAgentProbeServer(
+ buildServerConfig({
+ args: [
+ "--host",
+ "127.0.0.1",
+ "--port",
+ "0",
+ "--data",
+ dataPath,
+ "--db",
+ dbPath,
+ ],
+ env: {},
+ }),
+ );
+
+ try {
+ const staticSamples = await timeRequest(async () => {
+ const response = await fetch(`${server.url}/`);
+ await response.text();
+ }, samples);
+
+ const listSamples = await timeRequest(async () => {
+ const response = await fetch(`${server.url}/api/runs`);
+ await response.json();
+ }, samples);
+
+ const createSamples = await timeRequest(async () => {
+ const response = await fetch(`${server.url}/api/runs`, {
+ method: "POST",
+ headers: { "content-type": "application/json" },
+ body: "{}",
+ });
+ await response.text();
+ }, samples);
+
+ const sseSamples = await measureSseFirstEvent(server, samples);
+
+ const recordings: Record = {
+ "static.index": staticSamples,
+ "api.runs.list": listSamples,
+ "api.runs.create": createSamples,
+ "sse.first_event": sseSamples,
+ };
+
+ let exceeded = 0;
+ console.log("kind\tlabel\tp50_ms\tp95_ms\tp99_ms\tbudget_ms\tstatus");
+ for (const budget of BUDGETS) {
+ const values = recordings[budget.kind];
+ const p50 = percentile(values, 0.5);
+ const p95 = percentile(values, 0.95);
+ const p99 = percentile(values, 0.99);
+ const ok = p95 <= budget.p95Ms;
+ if (!ok) exceeded += 1;
+ console.log(
+ [
+ budget.kind,
+ budget.label,
+ p50.toFixed(2),
+ p95.toFixed(2),
+ p99.toFixed(2),
+ budget.p95Ms.toString(),
+ ok ? "ok" : "over",
+ ].join("\t"),
+ );
+ }
+
+ if (exceeded > 0 && strict) {
+ console.error(`\n${exceeded} budget(s) exceeded.`);
+ process.exitCode = 1;
+ }
+ } finally {
+ await server.stop();
+ }
+}
+
+void main();
diff --git a/scripts/soak.ts b/scripts/soak.ts
new file mode 100644
index 0000000..3ea15e9
--- /dev/null
+++ b/scripts/soak.ts
@@ -0,0 +1,255 @@
+#!/usr/bin/env bun
+/* eslint-disable no-console */
+import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+
+import {
+ type StartedServer,
+ startAgentProbeServer,
+} from "../src/runtime/server/app-server.ts";
+import { buildServerConfig } from "../src/runtime/server/config.ts";
+import { METRIC_NAMES } from "../src/runtime/server/observability/index.ts";
+
+type Mode = "ci" | "manual";
+
+type SoakOptions = {
+ mode: Mode;
+ durationMs: number;
+ runs: number;
+ sseConnections: number;
+};
+
+function parseOptions(args: string[]): SoakOptions {
+ const mode: Mode = args.includes("--manual") ? "manual" : "ci";
+ const durationFlag = args.indexOf("--duration-ms");
+ const durationMs =
+ durationFlag !== -1
+ ? Math.max(1_000, Number(args[durationFlag + 1] ?? ""))
+ : mode === "manual"
+ ? 60 * 60 * 1_000
+ : 10_000;
+ const runsFlag = args.indexOf("--runs");
+ const runs =
+ runsFlag !== -1
+ ? Math.max(1, Number(args[runsFlag + 1] ?? ""))
+ : mode === "manual"
+ ? 500
+ : 50;
+ const sseFlag = args.indexOf("--sse-connections");
+ const sseConnections =
+ sseFlag !== -1
+ ? Math.max(1, Number(args[sseFlag + 1] ?? ""))
+ : mode === "manual"
+ ? 5
+ : 3;
+ return { mode, durationMs, runs, sseConnections };
+}
+
+function writeMinimalSuite(root: string): string {
+ const data = join(root, "data");
+ mkdirSync(data, { recursive: true });
+ writeFileSync(
+ join(data, "endpoint.yaml"),
+ [
+ "transport: http",
+ "connection:",
+ " base_url: http://example.test",
+ "request:",
+ " method: POST",
+ ' url: "{{ base_url }}/chat"',
+ " body_template: '{}'",
+ "response:",
+ " format: text",
+ ' content_path: "$"',
+ "",
+ ].join("\n"),
+ "utf8",
+ );
+ return data;
+}
+
+type RssSample = { ts: number; rssMb: number };
+
+function rssMb(): number {
+ return process.memoryUsage.rss() / 1024 / 1024;
+}
+
+async function openSse(server: StartedServer, runId: string) {
+ const response = await fetch(`${server.url}/api/runs/${runId}/events`);
+ const reader = response.body?.getReader();
+ if (!reader) return;
+ let firstEventLag = Number.NaN;
+ const start = performance.now();
+ const read = async () => {
+ try {
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+ if (Number.isNaN(firstEventLag) && value && value.length > 0) {
+ firstEventLag = performance.now() - start;
+ }
+ }
+ } catch {
+ // stream ended
+ }
+ };
+ void read();
+ return {
+ cancel: async () => {
+ try {
+ await reader.cancel();
+ } catch {
+ // ignore
+ }
+ },
+ firstEventLag: () => firstEventLag,
+ };
+}
+
+async function runSoak(options: SoakOptions): Promise {
+ const root = mkdtempSync(join(tmpdir(), "agentprobe-soak-"));
+ const dataPath = writeMinimalSuite(root);
+ const dbPath = join(root, "runs.sqlite3");
+ const server = await startAgentProbeServer(
+ buildServerConfig({
+ args: [
+ "--host",
+ "127.0.0.1",
+ "--port",
+ "0",
+ "--data",
+ dataPath,
+ "--db",
+ dbPath,
+ ],
+ env: {},
+ }),
+ );
+
+ const endBy = Date.now() + options.durationMs;
+ let runsStarted = 0;
+ let failures = 0;
+ const rssSamples: RssSample[] = [{ ts: Date.now(), rssMb: rssMb() }];
+
+ const latencies: number[] = [];
+ let openConnections = 0;
+ const sseHandles: Awaited>[] = [];
+
+ const ssePool = async (): Promise => {
+ for (let i = 0; i < options.sseConnections; i++) {
+ const runId = `soak-sse-${i}`;
+ server.streamHub.publish({
+ runId,
+ kind: "run_progress",
+ payload: { kind: "scenario_started" },
+ });
+ const handle = await openSse(server, runId);
+ if (handle) sseHandles.push(handle);
+ openConnections += 1;
+ }
+ };
+
+ await ssePool();
+
+ while (Date.now() < endBy && runsStarted < options.runs) {
+ const runId = `soak-run-${runsStarted}`;
+ const t0 = performance.now();
+ server.streamHub.publish({
+ runId,
+ kind: "run_started",
+ payload: {
+ run_id: runId,
+ label: null,
+ preset_id: null,
+ trigger: "soak",
+ },
+ });
+ // Simulate progress and terminal event.
+ server.streamHub.publish({
+ runId,
+ kind: "scenario_started",
+ payload: { scenario_id: "synthetic" },
+ });
+ server.streamHub.publish({
+ runId,
+ kind: "run_finished",
+ payload: { kind: "run_finished", run_id: runId },
+ });
+ latencies.push(performance.now() - t0);
+ runsStarted += 1;
+
+ if (runsStarted % 25 === 0) {
+ rssSamples.push({ ts: Date.now(), rssMb: rssMb() });
+ }
+
+ try {
+ const response = await fetch(`${server.url}/api/runs`);
+ await response.json();
+ } catch {
+ failures += 1;
+ }
+ // Yield to the event loop so stream subscribers can drain.
+ await new Promise((resolve) => setImmediate(resolve));
+ }
+
+ rssSamples.push({ ts: Date.now(), rssMb: rssMb() });
+
+ const activeRuns = server.observability.metrics.getGauge(
+ METRIC_NAMES.runsActive,
+ );
+ const httpCounters = server.observability.metrics
+ .snapshot()
+ .counters.filter((entry) => entry.name === METRIC_NAMES.httpRequests)
+ .reduce((total, entry) => total + entry.value, 0);
+ const openSseGauge = server.observability.metrics.getGauge(
+ METRIC_NAMES.sseConnections,
+ );
+
+ const firstEventLags = sseHandles
+ .map((handle) => handle?.firstEventLag() ?? Number.NaN)
+ .filter((value) => !Number.isNaN(value));
+ const avgLag =
+ firstEventLags.length > 0
+ ? firstEventLags.reduce((sum, value) => sum + value, 0) /
+ firstEventLags.length
+ : 0;
+ const p95Latency = latencies.length
+ ? ([...latencies].sort((a, b) => a - b)[
+ Math.floor(latencies.length * 0.95)
+ ] ?? 0)
+ : 0;
+
+ for (const handle of sseHandles) {
+ if (handle) await handle.cancel();
+ }
+
+ await server.stop();
+
+ const summary = {
+ mode: options.mode,
+ duration_ms: Date.now() - (endBy - options.durationMs),
+ runs: runsStarted,
+ failures,
+ http_requests: httpCounters,
+ open_sse_connections_at_shutdown: openSseGauge,
+ active_runs_at_shutdown: activeRuns,
+ event_lag_ms_avg: Math.round(avgLag * 100) / 100,
+ request_latency_ms_p95: Math.round(p95Latency * 100) / 100,
+ rss_mb_start: rssSamples[0]?.rssMb ?? 0,
+ rss_mb_end: rssSamples[rssSamples.length - 1]?.rssMb ?? 0,
+ rss_mb_samples: rssSamples.length,
+ };
+
+ console.log(JSON.stringify(summary, null, 2));
+
+ if (activeRuns !== 0) {
+ console.error(
+ `soak harness: expected 0 active runs at shutdown, got ${activeRuns}`,
+ );
+ process.exitCode = 1;
+ }
+ void openConnections;
+}
+
+void runSoak(parseOptions(process.argv.slice(2)));
diff --git a/src/runtime/server/app-server.ts b/src/runtime/server/app-server.ts
index 0bcbf62..f514238 100644
--- a/src/runtime/server/app-server.ts
+++ b/src/runtime/server/app-server.ts
@@ -17,6 +17,13 @@ import { PresetController } from "./controllers/preset-controller.ts";
import { RunController } from "./controllers/run-controller.ts";
import { SuiteController } from "./controllers/suite-controller.ts";
import { ensureRequestId, errorResponse } from "./http-helpers.ts";
+import {
+ createObservability,
+ type Logger,
+ METRIC_NAMES,
+ type Observability,
+ summarizeServerConfig,
+} from "./observability/index.ts";
import { handleCompareRuns } from "./routes/comparisons.ts";
import { handleHealthz, handleReadyz, handleSession } from "./routes/health.ts";
import {
@@ -58,6 +65,7 @@ export type ServerContext = {
comparisonController: ComparisonController;
repository: PersistenceRepository;
streamHub: StreamHub;
+ observability: Observability;
requestId: string;
startedAt: number;
version: string;
@@ -69,6 +77,7 @@ export type StartedServer = {
port: number;
streamHub: StreamHub;
suiteController: SuiteController;
+ observability: Observability;
stop: () => Promise;
};
@@ -332,37 +341,36 @@ function withCorsHeaders(
}
function logRequest(
- config: ServerConfig,
+ logger: Logger,
request: Request,
response: Response,
durationMs: number,
requestId: string,
+ matchedRoute: string | undefined,
): void {
const pathname = new URL(request.url).pathname;
- if (config.logFormat === "json") {
- const payload = {
- ts: new Date().toISOString(),
- level: "info",
- component: "agentprobe.server",
- method: request.method,
- path: pathname,
- status: response.status,
- duration_ms: Math.round(durationMs),
- request_id: requestId,
- };
- process.stderr.write(`${JSON.stringify(payload)}\n`);
- return;
- }
- process.stderr.write(
- `[server] ${request.method} ${pathname} -> ${response.status} (${durationMs.toFixed(
- 1,
- )}ms) rid=${requestId}\n`,
- );
+ logger.info("http.request", {
+ method: request.method,
+ path: pathname,
+ route: matchedRoute ?? null,
+ status: response.status,
+ duration_ms: Math.round(durationMs),
+ request_id: requestId,
+ });
}
+export type StartAgentProbeServerOptions = {
+ observability?: Observability;
+};
+
export async function startAgentProbeServer(
config: ServerConfig,
+ options: StartAgentProbeServerOptions = {},
): Promise {
+ const observability =
+ options.observability ?? createObservability({ format: config.logFormat });
+ const { logger, metrics } = observability;
+
const repository: RecordingRepository = createRecordingRepository(
config.dbUrl,
);
@@ -381,11 +389,19 @@ export async function startAgentProbeServer(
repository,
suiteController,
streamHub,
+ observability,
});
const comparisonController = createComparisonController({ repository });
const routes = buildRoutes();
const startedAt = Date.now();
+ logger.info("server.startup", {
+ version: SERVER_VERSION,
+ config: summarizeServerConfig(config),
+ });
+ metrics.setGauge(METRIC_NAMES.runsActive, 0);
+ metrics.setGauge(METRIC_NAMES.sseConnections, 0);
+
const baseContext = {
config,
presetController,
@@ -394,6 +410,7 @@ export async function startAgentProbeServer(
comparisonController,
repository,
streamHub,
+ observability,
startedAt,
version: SERVER_VERSION,
};
@@ -403,13 +420,16 @@ export async function startAgentProbeServer(
const url = new URL(request.url);
const t0 = performance.now();
let response: Response;
+ let routeLabel: string | undefined;
const context: ServerContext = { ...baseContext, requestId };
try {
if (request.method === "OPTIONS" && isApiPath(url.pathname)) {
response = preflightResponse(request, config);
+ routeLabel = "OPTIONS";
} else {
const matched = matchRoute(routes, request.method, url.pathname);
if (matched) {
+ routeLabel = matched.route.pattern.source;
if (matched.route.requiresAuth && config.token) {
if (!verifyBearerToken(request, config.token)) {
response = errorResponse({
@@ -466,7 +486,13 @@ export async function startAgentProbeServer(
response = withCorsHeaders(request, config, response);
}
- logRequest(config, request, response, performance.now() - t0, requestId);
+ const duration = performance.now() - t0;
+ logRequest(logger, request, response, duration, requestId, routeLabel);
+ metrics.incrementCounter(METRIC_NAMES.httpRequests, 1, {
+ method: request.method,
+ route: routeLabel ?? "unmatched",
+ status: response.status,
+ });
return response;
};
@@ -491,6 +517,7 @@ export async function startAgentProbeServer(
port,
streamHub,
suiteController,
+ observability,
stop,
};
}
diff --git a/src/runtime/server/controllers/run-controller.ts b/src/runtime/server/controllers/run-controller.ts
index 460c538..5aafce3 100644
--- a/src/runtime/server/controllers/run-controller.ts
+++ b/src/runtime/server/controllers/run-controller.ts
@@ -14,6 +14,14 @@ import type {
RunProgressEvent,
} from "../../../shared/types/contracts.ts";
import { AgentProbeConfigError } from "../../../shared/utils/errors.ts";
+import {
+ type Logger,
+ METRIC_NAMES,
+ type MetricsRegistry,
+ type Observability,
+ SPAN_NAMES,
+ type SpanRecorder,
+} from "../observability/index.ts";
import type { StreamHub } from "../streams/hub.ts";
import {
HttpInputError,
@@ -163,14 +171,32 @@ function parseOverrides(
export class RunController {
private readonly activeByRunId = new Map();
private readonly activeBySuiteKey = new Map();
+ private readonly logger: Logger;
+ private readonly metrics: MetricsRegistry | undefined;
+ private readonly spans: SpanRecorder | undefined;
constructor(
private readonly options: {
repository: RecordingRepository;
suiteController: SuiteController;
streamHub: StreamHub;
+ observability?: Observability;
},
- ) {}
+ ) {
+ this.logger = options.observability
+ ? options.observability.logger.child("agentprobe.run", {})
+ : ({
+ log: () => {},
+ info: () => {},
+ warn: () => {},
+ error: () => {},
+ child() {
+ return this;
+ },
+ } as Logger);
+ this.metrics = options.observability?.metrics;
+ this.spans = options.observability?.spans;
+ }
assertRunnable(): void {
ensureOpenRouterConfigured();
@@ -309,15 +335,32 @@ export class RunController {
}
start(spec: RunSpec): StartRunResult {
- const client = ensureOpenRouterConfigured();
+ const validationScope = this.spans?.start(SPAN_NAMES.runStartValidation, {
+ preset_id: spec.presetId ?? null,
+ });
+ let client: OpenAiResponsesClient;
+ try {
+ client = ensureOpenRouterConfigured();
+ } catch (error) {
+ validationScope?.setStatus(
+ "error",
+ error instanceof Error ? error : new Error(String(error)),
+ );
+ validationScope?.end();
+ throw error;
+ }
const suiteKey = this.suiteKey(spec);
if (this.activeBySuiteKey.has(suiteKey)) {
+ validationScope?.setStatus("error", new Error("conflict"));
+ validationScope?.end();
throw new HttpInputError(
409,
"conflict",
"A run with the same resolved suite key is already active.",
);
}
+ validationScope?.setStatus("ok");
+ validationScope?.end();
const abortController = new AbortController();
const recorder = this.options.repository.createRecorder();
@@ -345,9 +388,28 @@ export class RunController {
};
this.activeByRunId.set(runId, active);
this.activeBySuiteKey.set(suiteKey, active);
+
+ this.metrics?.incrementCounter(METRIC_NAMES.runsStartedTotal, 1, {
+ preset: spec.presetId ?? "none",
+ });
+ this.metrics?.adjustGauge(METRIC_NAMES.runsActive, 1);
+ this.logger.info("run.started", {
+ run_id: runId,
+ preset_id: spec.presetId ?? null,
+ label: spec.label ?? null,
+ });
+
void promise.finally(() => {
this.activeByRunId.delete(runId);
this.activeBySuiteKey.delete(suiteKey);
+ this.metrics?.adjustGauge(METRIC_NAMES.runsActive, -1);
+ this.metrics?.incrementCounter(METRIC_NAMES.runsFinishedTotal, 1, {
+ preset: spec.presetId ?? "none",
+ });
+ this.logger.info("run.finished", {
+ run_id: runId,
+ preset_id: spec.presetId ?? null,
+ });
});
this.options.streamHub.publish({
@@ -373,6 +435,21 @@ export class RunController {
suiteKey: string;
},
): Promise {
+ const executeScope = this.spans?.start(SPAN_NAMES.runControllerExecute, {
+ preset_id: spec.presetId ?? null,
+ dry_run: spec.dryRun,
+ repeat: spec.repeat,
+ });
+ const bootScope = this.spans?.start(SPAN_NAMES.runSuiteBoot, {
+ preset_id: spec.presetId ?? null,
+ });
+ let bootEnded = false;
+ const completeBoot = (): void => {
+ if (bootEnded) return;
+ bootEnded = true;
+ bootScope?.setStatus("ok");
+ bootScope?.end();
+ };
try {
await runSuite({
endpoint: spec.endpoint,
@@ -391,6 +468,12 @@ export class RunController {
if (!runId) {
return;
}
+ if (
+ event.kind === "suite_started" ||
+ event.kind === "scenario_started"
+ ) {
+ completeBoot();
+ }
this.options.streamHub.publish({
runId,
kind:
@@ -416,6 +499,12 @@ export class RunController {
const runId = options.recorder.runId;
const failure = normalizeError(error);
writeRunExecutorErrorLog(runId, failure);
+ this.logger.error("run.error", {
+ run_id: runId ?? null,
+ preset_id: spec.presetId ?? null,
+ error_type: failure.name || "Error",
+ error_message: failure.message,
+ });
if (runId) {
try {
options.recorder.recordRunError(failure, {
@@ -436,6 +525,11 @@ export class RunController {
},
});
}
+ executeScope?.setStatus("error", failure);
+ bootScope?.setStatus("error", failure);
+ } finally {
+ completeBoot();
+ executeScope?.end();
}
}
diff --git a/src/runtime/server/observability/index.ts b/src/runtime/server/observability/index.ts
new file mode 100644
index 0000000..ff974cb
--- /dev/null
+++ b/src/runtime/server/observability/index.ts
@@ -0,0 +1,48 @@
+import type { LogFormat } from "../config.ts";
+import { createLogger, type Logger } from "./logger.ts";
+import { MetricsRegistry, SERVER_METRIC_NAMES } from "./metrics.ts";
+import { SERVER_SPAN_NAMES, SpanRecorder } from "./spans.ts";
+
+export type Observability = {
+ logger: Logger;
+ metrics: MetricsRegistry;
+ spans: SpanRecorder;
+};
+
+export function createObservability(options: {
+ format: LogFormat;
+ component?: string;
+ metrics?: MetricsRegistry;
+ spans?: SpanRecorder;
+}): Observability {
+ const logger = createLogger({
+ component: options.component ?? "agentprobe.server",
+ format: options.format,
+ });
+ return {
+ logger,
+ metrics: options.metrics ?? new MetricsRegistry(),
+ spans: options.spans ?? new SpanRecorder(),
+ };
+}
+
+export type { LogFields, Logger, LogLevel } from "./logger.ts";
+export { createLogger } from "./logger.ts";
+export type {
+ CounterSnapshot,
+ GaugeSnapshot,
+ MetricLabels,
+ MetricsSnapshot,
+} from "./metrics.ts";
+export { MetricsRegistry, SERVER_METRIC_NAMES } from "./metrics.ts";
+export {
+ isSecretKey,
+ redactRecord,
+ redactSecretValue,
+ summarizeServerConfig,
+} from "./redaction.ts";
+export type { SpanRecord, SpanScope } from "./spans.ts";
+export { SERVER_SPAN_NAMES, SpanRecorder } from "./spans.ts";
+
+export const METRIC_NAMES = SERVER_METRIC_NAMES;
+export const SPAN_NAMES = SERVER_SPAN_NAMES;
diff --git a/src/runtime/server/observability/logger.ts b/src/runtime/server/observability/logger.ts
new file mode 100644
index 0000000..e06d482
--- /dev/null
+++ b/src/runtime/server/observability/logger.ts
@@ -0,0 +1,97 @@
+import type { LogFormat } from "../config.ts";
+
+export type LogLevel = "debug" | "info" | "warn" | "error";
+
+export type LogFields = Record;
+
+export type Logger = {
+ log(level: LogLevel, event: string, fields?: LogFields): void;
+ info(event: string, fields?: LogFields): void;
+ warn(event: string, fields?: LogFields): void;
+ error(event: string, fields?: LogFields): void;
+ child(component: string, baseFields?: LogFields): Logger;
+};
+
+type Sink = (line: string) => void;
+
+const LEVEL_LABEL: Record = {
+ debug: "debug",
+ info: "info",
+ warn: "warn",
+ error: "error",
+};
+
+function defaultSink(line: string): void {
+ process.stderr.write(`${line}\n`);
+}
+
+function formatText(
+ level: LogLevel,
+ component: string,
+ event: string,
+ fields: LogFields,
+): string {
+ const parts: string[] = [`[${component}]`, `${LEVEL_LABEL[level]}`, event];
+ for (const [key, value] of Object.entries(fields)) {
+ parts.push(`${key}=${formatTextValue(value)}`);
+ }
+ return parts.join(" ");
+}
+
+function formatTextValue(value: unknown): string {
+ if (value === null || value === undefined) return "-";
+ if (typeof value === "string") {
+ return value.includes(" ") ? `"${value.replace(/"/g, '\\"')}"` : value;
+ }
+ if (typeof value === "number" || typeof value === "boolean") {
+ return String(value);
+ }
+ return JSON.stringify(value);
+}
+
+export function createLogger(options: {
+ component: string;
+ format: LogFormat;
+ baseFields?: LogFields;
+ sink?: Sink;
+}): Logger {
+ const sink = options.sink ?? defaultSink;
+ const baseFields = { ...(options.baseFields ?? {}) };
+ const component = options.component;
+ const format = options.format;
+
+ const log = (
+ level: LogLevel,
+ event: string,
+ fields: LogFields = {},
+ ): void => {
+ const merged = { ...baseFields, ...fields };
+ if (format === "json") {
+ const payload = {
+ ts: new Date().toISOString(),
+ level: LEVEL_LABEL[level],
+ component,
+ event,
+ ...merged,
+ };
+ sink(JSON.stringify(payload));
+ } else {
+ sink(formatText(level, component, event, merged));
+ }
+ };
+
+ return {
+ log,
+ info: (event, fields) => log("info", event, fields),
+ warn: (event, fields) => log("warn", event, fields),
+ error: (event, fields) => log("error", event, fields),
+ child(childComponent: string, childFields?: LogFields): Logger {
+ return createLogger({
+ component: childComponent,
+ format,
+ baseFields: { ...baseFields, ...(childFields ?? {}) },
+ sink,
+ });
+ },
+ };
+}
diff --git a/src/runtime/server/observability/metrics.ts b/src/runtime/server/observability/metrics.ts
new file mode 100644
index 0000000..21454f1
--- /dev/null
+++ b/src/runtime/server/observability/metrics.ts
@@ -0,0 +1,142 @@
+export type MetricLabels = Readonly>;
+
+export type CounterSnapshot = {
+ name: string;
+ value: number;
+ labels: Record;
+};
+
+export type GaugeSnapshot = CounterSnapshot;
+
+export type MetricsSnapshot = {
+ counters: CounterSnapshot[];
+ gauges: GaugeSnapshot[];
+};
+
+function labelKey(labels: MetricLabels | undefined): string {
+ if (!labels) return "";
+ const entries = Object.entries(labels)
+ .map(([k, v]) => [k, String(v)] as const)
+ .sort(([a], [b]) => (a < b ? -1 : a > b ? 1 : 0));
+ return entries.map(([k, v]) => `${k}=${v}`).join("|");
+}
+
+function cloneLabels(labels: MetricLabels | undefined): MetricLabels {
+ if (!labels) return {};
+ return { ...labels };
+}
+
+export class MetricsRegistry {
+ private readonly counters = new Map>();
+ private readonly counterLabels = new Map>();
+ private readonly gauges = new Map>();
+ private readonly gaugeLabels = new Map>();
+
+ incrementCounter(name: string, value = 1, labels?: MetricLabels): void {
+ const key = labelKey(labels);
+ let perName = this.counters.get(name);
+ if (!perName) {
+ perName = new Map();
+ this.counters.set(name, perName);
+ }
+ perName.set(key, (perName.get(key) ?? 0) + value);
+ let labelMap = this.counterLabels.get(name);
+ if (!labelMap) {
+ labelMap = new Map();
+ this.counterLabels.set(name, labelMap);
+ }
+ if (!labelMap.has(key)) {
+ labelMap.set(key, cloneLabels(labels));
+ }
+ }
+
+ setGauge(name: string, value: number, labels?: MetricLabels): void {
+ const key = labelKey(labels);
+ let perName = this.gauges.get(name);
+ if (!perName) {
+ perName = new Map();
+ this.gauges.set(name, perName);
+ }
+ perName.set(key, value);
+ let labelMap = this.gaugeLabels.get(name);
+ if (!labelMap) {
+ labelMap = new Map();
+ this.gaugeLabels.set(name, labelMap);
+ }
+ if (!labelMap.has(key)) {
+ labelMap.set(key, cloneLabels(labels));
+ }
+ }
+
+ adjustGauge(name: string, delta: number, labels?: MetricLabels): void {
+ const key = labelKey(labels);
+ let perName = this.gauges.get(name);
+ if (!perName) {
+ perName = new Map();
+ this.gauges.set(name, perName);
+ }
+ perName.set(key, (perName.get(key) ?? 0) + delta);
+ let labelMap = this.gaugeLabels.get(name);
+ if (!labelMap) {
+ labelMap = new Map();
+ this.gaugeLabels.set(name, labelMap);
+ }
+ if (!labelMap.has(key)) {
+ labelMap.set(key, cloneLabels(labels));
+ }
+ }
+
+ getCounter(name: string, labels?: MetricLabels): number {
+ const key = labelKey(labels);
+ return this.counters.get(name)?.get(key) ?? 0;
+ }
+
+ getGauge(name: string, labels?: MetricLabels): number {
+ const key = labelKey(labels);
+ return this.gauges.get(name)?.get(key) ?? 0;
+ }
+
+ snapshot(): MetricsSnapshot {
+ const counters: CounterSnapshot[] = [];
+ for (const [name, perName] of this.counters.entries()) {
+ const labelMap = this.counterLabels.get(name);
+ for (const [key, value] of perName.entries()) {
+ counters.push({
+ name,
+ value,
+ labels: { ...(labelMap?.get(key) ?? {}) },
+ });
+ }
+ }
+ const gauges: GaugeSnapshot[] = [];
+ for (const [name, perName] of this.gauges.entries()) {
+ const labelMap = this.gaugeLabels.get(name);
+ for (const [key, value] of perName.entries()) {
+ gauges.push({
+ name,
+ value,
+ labels: { ...(labelMap?.get(key) ?? {}) },
+ });
+ }
+ }
+ return {
+ counters: counters.sort((a, b) => a.name.localeCompare(b.name)),
+ gauges: gauges.sort((a, b) => a.name.localeCompare(b.name)),
+ };
+ }
+
+ reset(): void {
+ this.counters.clear();
+ this.counterLabels.clear();
+ this.gauges.clear();
+ this.gaugeLabels.clear();
+ }
+}
+
+export const SERVER_METRIC_NAMES = {
+ httpRequests: "server.http.requests",
+ runsActive: "server.runs.active",
+ runsStartedTotal: "server.runs.started_total",
+ runsFinishedTotal: "server.runs.finished_total",
+ sseConnections: "server.sse.connections",
+} as const;
diff --git a/src/runtime/server/observability/redaction.ts b/src/runtime/server/observability/redaction.ts
new file mode 100644
index 0000000..54734e0
--- /dev/null
+++ b/src/runtime/server/observability/redaction.ts
@@ -0,0 +1,53 @@
+import { redactDbUrl } from "../../../providers/persistence/url.ts";
+import type { ServerConfig } from "../config.ts";
+
+const REDACTED = "[redacted]";
+const SECRET_KEY_PATTERN =
+ /(token|secret|key|password|authorization|api[_-]?key)/i;
+
+export function redactSecretValue(value: string | undefined | null): string {
+ if (value === undefined || value === null || value === "") {
+ return "";
+ }
+ if (value.length <= 4) {
+ return REDACTED;
+ }
+ return `${REDACTED}:${value.length}c`;
+}
+
+export function isSecretKey(key: string): boolean {
+ return SECRET_KEY_PATTERN.test(key);
+}
+
+export function redactRecord(
+ record: Record,
+): Record {
+ const out: Record = {};
+ for (const [key, value] of Object.entries(record)) {
+ if (typeof value === "string" && isSecretKey(key)) {
+ out[key] = redactSecretValue(value);
+ } else if (value && typeof value === "object" && !Array.isArray(value)) {
+ out[key] = redactRecord(value as Record);
+ } else {
+ out[key] = value;
+ }
+ }
+ return out;
+}
+
+export function summarizeServerConfig(
+ config: ServerConfig,
+): Record {
+ return {
+ host: config.host,
+ port: config.port,
+ data_path: config.dataPath,
+ db_url: redactDbUrl(config.dbUrl),
+ dashboard_dist: config.dashboardDist ?? null,
+ token: config.token ? redactSecretValue(config.token) : null,
+ cors_origins: config.corsOrigins,
+ unsafe_expose: config.unsafeExpose,
+ open_browser: config.openBrowser,
+ log_format: config.logFormat,
+ };
+}
diff --git a/src/runtime/server/observability/spans.ts b/src/runtime/server/observability/spans.ts
new file mode 100644
index 0000000..1bd8de5
--- /dev/null
+++ b/src/runtime/server/observability/spans.ts
@@ -0,0 +1,108 @@
+export type SpanRecord = {
+ name: string;
+ startedAt: number;
+ durationMs: number;
+ attributes: Record;
+ status: "ok" | "error";
+ error?: { type: string; message: string };
+};
+
+export type SpanScope = {
+ setAttribute(key: string, value: unknown): void;
+ setStatus(status: "ok" | "error", error?: Error): void;
+ end(): void;
+ readonly name: string;
+ readonly startedAt: number;
+};
+
+export class SpanRecorder {
+ private readonly records: SpanRecord[] = [];
+ private readonly capacity: number;
+
+ constructor(options: { capacity?: number } = {}) {
+ this.capacity = options.capacity ?? 1024;
+ }
+
+ start(name: string, attributes: Record = {}): SpanScope {
+ const startedAt = performance.now();
+ const attrs: Record = { ...attributes };
+ let status: "ok" | "error" = "ok";
+ let recordedError: Error | undefined;
+ let ended = false;
+
+ const finalize = (): void => {
+ if (ended) return;
+ ended = true;
+ const record: SpanRecord = {
+ name,
+ startedAt,
+ durationMs: performance.now() - startedAt,
+ attributes: { ...attrs },
+ status,
+ };
+ if (status === "error" && recordedError) {
+ record.error = {
+ type: recordedError.name || "Error",
+ message: recordedError.message,
+ };
+ }
+ this.records.push(record);
+ if (this.records.length > this.capacity) {
+ this.records.splice(0, this.records.length - this.capacity);
+ }
+ };
+
+ return {
+ name,
+ startedAt,
+ setAttribute(key: string, value: unknown): void {
+ attrs[key] = value;
+ },
+ setStatus(next: "ok" | "error", error?: Error): void {
+ status = next;
+ if (error) {
+ recordedError = error;
+ }
+ },
+ end: finalize,
+ };
+ }
+
+ async withSpan(
+ name: string,
+ attributes: Record,
+ fn: (scope: SpanScope) => Promise | T,
+ ): Promise {
+ const scope = this.start(name, attributes);
+ try {
+ const result = await fn(scope);
+ scope.setStatus("ok");
+ return result;
+ } catch (error) {
+ scope.setStatus(
+ "error",
+ error instanceof Error ? error : new Error(String(error)),
+ );
+ throw error;
+ } finally {
+ scope.end();
+ }
+ }
+
+ snapshot(): SpanRecord[] {
+ return this.records.map((record) => ({
+ ...record,
+ attributes: { ...record.attributes },
+ }));
+ }
+
+ reset(): void {
+ this.records.length = 0;
+ }
+}
+
+export const SERVER_SPAN_NAMES = {
+ runStartValidation: "server.run.start.validation",
+ runControllerExecute: "server.run.controller.execute",
+ runSuiteBoot: "server.run.suite.boot",
+} as const;
diff --git a/src/runtime/server/routes/sse.ts b/src/runtime/server/routes/sse.ts
index 009bb60..5a1f09d 100644
--- a/src/runtime/server/routes/sse.ts
+++ b/src/runtime/server/routes/sse.ts
@@ -1,13 +1,17 @@
import type { RunRecord } from "../../../shared/types/contracts.ts";
import type { ServerContext } from "../app-server.ts";
import { errorResponse } from "../http-helpers.ts";
+import { METRIC_NAMES } from "../observability/index.ts";
import {
formatSseEvent,
formatSseKeepalive,
+ formatSseRetry,
+ isTerminalEvent,
type RunEvent,
} from "../streams/events.ts";
-const KEEPALIVE_INTERVAL_MS = 15_000;
+export const SSE_KEEPALIVE_INTERVAL_MS = 15_000;
+export const SSE_RECONNECT_RETRY_MS = 2_000;
function snapshotPayloadForRun(run: RunRecord): RunEvent["payload"] {
return {
@@ -33,7 +37,9 @@ function snapshotPayloadForRun(run: RunRecord): RunEvent["payload"] {
};
}
-function parseLastEventId(value: string | null): number | undefined {
+function parseLastEventId(
+ value: string | null | undefined,
+): number | undefined {
if (!value) {
return undefined;
}
@@ -44,22 +50,38 @@ function parseLastEventId(value: string | null): number | undefined {
return parsed;
}
+function pickLastEventId(request: Request, url: URL): number | undefined {
+ const headerValue =
+ request.headers.get("last-event-id") ??
+ request.headers.get("Last-Event-ID");
+ const queryValue = url.searchParams.get("last_event_id");
+ return parseLastEventId(headerValue) ?? parseLastEventId(queryValue);
+}
+
+function terminalEventForHistorical(
+ run: RunRecord,
+): RunEvent["kind"] | undefined {
+ if (run.status === "running") return undefined;
+ if (run.status === "cancelled") return "run_cancelled";
+ if (run.status === "errored" || run.status === "failed") return "run_failed";
+ return "run_finished";
+}
+
export async function handleRunSse(
request: Request,
context: ServerContext,
params: { runId: string },
): Promise {
- const lastEventId = parseLastEventId(request.headers.get("last-event-id"));
+ const url = new URL(request.url);
+ const lastEventId = pickLastEventId(request, url);
const { runId } = params;
const historicalRun: RunRecord | undefined = context.config.dbUrl
? await context.repository.getRun(runId)
: undefined;
- // Replay any buffered events (after last-event-id if provided).
const replayEvents = context.streamHub.replay(runId, lastEventId);
- // If neither a buffered stream nor a historical run exist, treat as 404.
if (!historicalRun && replayEvents.length === 0) {
return errorResponse({
status: 404,
@@ -70,12 +92,16 @@ export async function handleRunSse(
}
const encoder = new TextEncoder();
+ const metrics = context.observability.metrics;
let unsubscribe: (() => void) | undefined;
let keepalive: ReturnType | undefined;
+ let teardown: (() => void) | undefined;
const stream = new ReadableStream({
start(controller) {
let closed = false;
+ let connectionCounted = false;
+ let terminalSent = false;
const safeEnqueue = (chunk: string): void => {
try {
controller.enqueue(encoder.encode(chunk));
@@ -96,7 +122,12 @@ export async function handleRunSse(
clearInterval(keepalive);
keepalive = undefined;
}
+ if (connectionCounted) {
+ metrics.adjustGauge(METRIC_NAMES.sseConnections, -1);
+ connectionCounted = false;
+ }
};
+ teardown = cleanup;
const close = (): void => {
cleanup();
try {
@@ -106,13 +137,39 @@ export async function handleRunSse(
}
};
+ metrics.adjustGauge(METRIC_NAMES.sseConnections, 1);
+ connectionCounted = true;
+
+ // Always advise reconnect retry interval to the browser.
+ safeEnqueue(formatSseRetry(SSE_RECONNECT_RETRY_MS));
+
+ const dispatchEvent = (event: RunEvent): void => {
+ safeEnqueue(formatSseEvent(event));
+ if (isTerminalEvent(event) && !terminalSent) {
+ terminalSent = true;
+ queueMicrotask(close);
+ }
+ };
+
if (replayEvents.length > 0) {
for (const event of replayEvents) {
- safeEnqueue(formatSseEvent(event));
+ dispatchEvent(event);
}
- if (historicalRun && historicalRun.status !== "running") {
- queueMicrotask(close);
- return;
+ if (!terminalSent && historicalRun) {
+ const terminalKind = terminalEventForHistorical(historicalRun);
+ if (terminalKind) {
+ const terminalEvent = context.streamHub.publish({
+ runId,
+ kind: terminalKind,
+ payload: {
+ run_id: runId,
+ source: "historical_terminal",
+ status: historicalRun.status,
+ },
+ });
+ dispatchEvent(terminalEvent);
+ return;
+ }
}
} else if (historicalRun) {
const snapshot = context.streamHub.publish({
@@ -120,20 +177,32 @@ export async function handleRunSse(
kind: "snapshot",
payload: snapshotPayloadForRun(historicalRun),
});
- safeEnqueue(formatSseEvent(snapshot));
- if (historicalRun.status !== "running") {
- queueMicrotask(close);
+ dispatchEvent(snapshot);
+ const terminalKind = terminalEventForHistorical(historicalRun);
+ if (terminalKind && !terminalSent) {
+ const terminalEvent = context.streamHub.publish({
+ runId,
+ kind: terminalKind,
+ payload: {
+ run_id: runId,
+ source: "historical_terminal",
+ status: historicalRun.status,
+ },
+ });
+ dispatchEvent(terminalEvent);
return;
}
}
+ if (terminalSent) return;
+
unsubscribe = context.streamHub.subscribe(runId, (event) => {
- safeEnqueue(formatSseEvent(event));
+ dispatchEvent(event);
});
keepalive = setInterval(() => {
safeEnqueue(formatSseKeepalive());
- }, KEEPALIVE_INTERVAL_MS);
+ }, SSE_KEEPALIVE_INTERVAL_MS);
if (request.signal) {
request.signal.addEventListener("abort", () => {
@@ -142,11 +211,8 @@ export async function handleRunSse(
}
},
cancel() {
- if (unsubscribe) {
- unsubscribe();
- }
- if (keepalive) {
- clearInterval(keepalive);
+ if (teardown) {
+ teardown();
}
},
});
diff --git a/src/runtime/server/streams/events.ts b/src/runtime/server/streams/events.ts
index f99f19f..f6af637 100644
--- a/src/runtime/server/streams/events.ts
+++ b/src/runtime/server/streams/events.ts
@@ -7,12 +7,23 @@ export type RunEventKind =
| "run_progress"
| "run_finished"
| "run_cancelled"
+ | "run_failed"
| "run_error"
| "scenario_started"
| "scenario_finished"
| "scenario_error"
| "log";
+export const TERMINAL_EVENT_KINDS: ReadonlySet = new Set([
+ "run_finished",
+ "run_cancelled",
+ "run_failed",
+]);
+
+export function isTerminalEvent(event: RunEvent): boolean {
+ return TERMINAL_EVENT_KINDS.has(event.kind);
+}
+
export type RunEvent = {
id: number;
runId: string;
@@ -35,3 +46,7 @@ export function formatSseEvent(event: RunEvent): string {
export function formatSseKeepalive(): string {
return `: keepalive ${new Date().toISOString()}\n\n`;
}
+
+export function formatSseRetry(retryMs: number): string {
+ return `retry: ${Math.max(0, Math.floor(retryMs))}\n\n`;
+}
diff --git a/tests/integration/server/sse-reconnect.test.ts b/tests/integration/server/sse-reconnect.test.ts
new file mode 100644
index 0000000..f3c8153
--- /dev/null
+++ b/tests/integration/server/sse-reconnect.test.ts
@@ -0,0 +1,162 @@
+import { afterEach, describe, expect, test } from "bun:test";
+import { mkdirSync, writeFileSync } from "node:fs";
+import { join } from "node:path";
+
+import {
+ type StartedServer,
+ startAgentProbeServer,
+} from "../../../src/runtime/server/app-server.ts";
+import { buildServerConfig } from "../../../src/runtime/server/config.ts";
+import { METRIC_NAMES } from "../../../src/runtime/server/observability/index.ts";
+import { makeTempDir } from "../../unit/support.ts";
+
+function writeMinimalData(root: string): string {
+ const data = join(root, "data");
+ mkdirSync(data, { recursive: true });
+ writeFileSync(
+ join(data, "endpoint.yaml"),
+ [
+ "transport: http",
+ "connection:",
+ " base_url: http://example.test",
+ "request:",
+ " method: POST",
+ ' url: "{{ base_url }}/chat"',
+ " body_template: '{}'",
+ "response:",
+ " format: text",
+ ' content_path: "$"',
+ "",
+ ].join("\n"),
+ "utf8",
+ );
+ return data;
+}
+
+async function readEnoughBytes(
+ reader: ReadableStreamDefaultReader>,
+ decoder: TextDecoder,
+ predicate: (collected: string) => boolean,
+ maxChunks = 10,
+): Promise {
+ let collected = "";
+ for (let i = 0; i < maxChunks; i++) {
+ const result = await reader.read();
+ if (result.done) break;
+ collected += decoder.decode(result.value, { stream: true });
+ if (predicate(collected)) break;
+ }
+ return collected;
+}
+
+async function startServer(servers: StartedServer[]): Promise {
+ const root = makeTempDir("sse-reconnect");
+ const data = writeMinimalData(root);
+ const dbPath = join(root, "runs.sqlite3");
+ const args = [
+ "--host",
+ "127.0.0.1",
+ "--port",
+ "0",
+ "--data",
+ data,
+ "--db",
+ dbPath,
+ ];
+ const server = await startAgentProbeServer(
+ buildServerConfig({ args, env: {} }),
+ );
+ servers.push(server);
+ return server;
+}
+
+describe("sse hardening", () => {
+ const servers: StartedServer[] = [];
+
+ afterEach(async () => {
+ for (const server of servers.splice(0)) {
+ await server.stop();
+ }
+ });
+
+ test("replays missed events using last_event_id query and emits terminal close", async () => {
+ const server = await startServer(servers);
+ const runId = "run-reconnect";
+
+ server.streamHub.publish({
+ runId,
+ kind: "run_started",
+ payload: { run_id: runId, label: null, preset_id: null, trigger: "test" },
+ });
+ server.streamHub.publish({
+ runId,
+ kind: "run_progress",
+ payload: { kind: "scenario_started", scenario_id: "smoke" },
+ });
+
+ const url = `${server.url}/api/runs/${runId}/events?last_event_id=1`;
+ const response = await fetch(url);
+ expect(response.status).toBe(200);
+ expect(response.headers.get("cache-control")).toContain("no-store");
+ expect(response.headers.get("x-accel-buffering")).toBe("no");
+ expect(response.headers.get("connection")).toBe("keep-alive");
+
+ const reader = response.body?.getReader() as
+ | ReadableStreamDefaultReader>
+ | undefined;
+ expect(reader).toBeDefined();
+ if (!reader) return;
+ const decoder = new TextDecoder();
+
+ server.streamHub.publish({
+ runId,
+ kind: "run_finished",
+ payload: { kind: "run_finished", run_id: runId },
+ });
+
+ const collected = await readEnoughBytes(reader, decoder, (text) =>
+ text.includes("event: run_finished"),
+ );
+ await reader.cancel();
+
+ expect(collected).toContain("retry: 2000");
+ expect(collected).toContain("event: run_progress");
+ expect(collected).not.toContain("event: run_started");
+ expect(collected).toContain("event: run_finished");
+ });
+
+ test("metrics record http requests and active sse connections", async () => {
+ const server = await startServer(servers);
+
+ await fetch(`${server.url}/healthz`);
+ const snapshot = server.observability.metrics.snapshot();
+ const requestEntries = snapshot.counters.filter(
+ (entry) => entry.name === METRIC_NAMES.httpRequests,
+ );
+ expect(requestEntries.length).toBeGreaterThan(0);
+ expect(server.observability.metrics.getGauge(METRIC_NAMES.runsActive)).toBe(
+ 0,
+ );
+
+ const runId = "run-metrics";
+ server.streamHub.publish({
+ runId,
+ kind: "run_progress",
+ payload: { kind: "scenario_started" },
+ });
+ const events = await fetch(`${server.url}/api/runs/${runId}/events`);
+ const reader = events.body?.getReader() as
+ | ReadableStreamDefaultReader>
+ | undefined;
+ expect(reader).toBeDefined();
+ if (!reader) return;
+ const decoder = new TextDecoder();
+ await readEnoughBytes(reader, decoder, (text) =>
+ text.includes("event: run_progress"),
+ );
+ expect(
+ server.observability.metrics.getGauge(METRIC_NAMES.sseConnections),
+ ).toBe(1);
+ await reader.cancel();
+ });
+});
diff --git a/tests/unit/dashboard/keyboard-shortcuts.test.tsx b/tests/unit/dashboard/keyboard-shortcuts.test.tsx
new file mode 100644
index 0000000..03cf3e5
--- /dev/null
+++ b/tests/unit/dashboard/keyboard-shortcuts.test.tsx
@@ -0,0 +1,146 @@
+import { beforeEach, describe, expect, test } from "bun:test";
+import { Window } from "happy-dom";
+
+const dashboardWindow = new Window({ url: "http://localhost/" });
+const dashboardDocument = dashboardWindow.document;
+(globalThis as Record).window = dashboardWindow;
+(globalThis as Record).document = dashboardDocument;
+(globalThis as Record).KeyboardEvent = dashboardWindow.KeyboardEvent;
+(globalThis as Record).Element = dashboardWindow.Element;
+(globalThis as Record).HTMLElement = dashboardWindow.HTMLElement;
+
+import {
+ createKeyboardDispatcher,
+ shouldIgnoreKeyboardEvent,
+} from "../../../dashboard/src/hooks/useKeyboardShortcuts.ts";
+
+describe("keyboard shortcut dispatcher", () => {
+ beforeEach(() => {
+ dashboardDocument.body.innerHTML = "";
+ });
+
+ function dispatch(
+ handler: (event: KeyboardEvent) => void,
+ key: string,
+ target?: EventTarget,
+ ): KeyboardEvent {
+ const event = new dashboardWindow.KeyboardEvent("keydown", {
+ key,
+ bubbles: true,
+ cancelable: true,
+ }) as unknown as KeyboardEvent;
+ if (target) {
+ Object.defineProperty(event, "target", { value: target });
+ }
+ handler(event);
+ return event;
+ }
+
+ test("ignores keys dispatched from text inputs", () => {
+ const input = dashboardDocument.createElement("input");
+ input.type = "text";
+ dashboardDocument.body.appendChild(input);
+ expect(shouldIgnoreKeyboardEvent(input)).toBeTrue();
+
+ let calls = 0;
+ const handler = createKeyboardDispatcher({
+ shortcuts: [
+ {
+ key: "j",
+ description: "down",
+ run: () => {
+ calls += 1;
+ },
+ },
+ ],
+ });
+ dispatch(handler, "j", input);
+ expect(calls).toBe(0);
+ });
+
+ test("fires single-key shortcut and calls preventDefault", () => {
+ let calls = 0;
+ const handler = createKeyboardDispatcher({
+ shortcuts: [
+ {
+ key: "/",
+ description: "focus",
+ run: () => {
+ calls += 1;
+ },
+ },
+ ],
+ });
+ const event = dispatch(handler, "/");
+ expect(calls).toBe(1);
+ expect(event.defaultPrevented).toBeTrue();
+ });
+
+ test("fires g r sequence only after both keys", () => {
+ let navigated = 0;
+ const handler = createKeyboardDispatcher({
+ shortcuts: [
+ {
+ sequence: ["g", "r"],
+ key: "r",
+ description: "go runs",
+ run: () => {
+ navigated += 1;
+ },
+ },
+ ],
+ });
+ dispatch(handler, "g");
+ expect(navigated).toBe(0);
+ dispatch(handler, "r");
+ expect(navigated).toBe(1);
+ });
+
+ test("resets pending sequence after timeout", () => {
+ let navigated = 0;
+ const handler = createKeyboardDispatcher({
+ shortcuts: [
+ {
+ sequence: ["g", "r"],
+ key: "r",
+ description: "go runs",
+ run: () => {
+ navigated += 1;
+ },
+ },
+ ],
+ sequenceTimeoutMs: 1,
+ });
+ dispatch(handler, "g");
+ const now = Date.now();
+ while (Date.now() - now < 5) {
+ // Spin the event loop past the tiny timeout.
+ }
+ dispatch(handler, "r");
+ expect(navigated).toBe(0);
+ });
+
+ test("ignores when a modifier key is held", () => {
+ let calls = 0;
+ const handler = createKeyboardDispatcher({
+ shortcuts: [
+ {
+ key: "j",
+ description: "down",
+ run: () => {
+ calls += 1;
+ },
+ },
+ ],
+ });
+ const event = new dashboardWindow.KeyboardEvent("keydown", {
+ key: "j",
+ ctrlKey: true,
+ bubbles: true,
+ cancelable: true,
+ }) as unknown as KeyboardEvent;
+ handler(event);
+ expect(calls).toBe(0);
+ expect(event.defaultPrevented).toBeFalse();
+ });
+});
diff --git a/tests/unit/server/observability/logger.test.ts b/tests/unit/server/observability/logger.test.ts
new file mode 100644
index 0000000..ba6c9d9
--- /dev/null
+++ b/tests/unit/server/observability/logger.test.ts
@@ -0,0 +1,56 @@
+import { describe, expect, test } from "bun:test";
+
+import { createLogger } from "../../../../src/runtime/server/observability/logger.ts";
+
+describe("createLogger", () => {
+ test("emits JSON lines with merged base fields", () => {
+ const lines: string[] = [];
+ const logger = createLogger({
+ component: "agentprobe.server",
+ format: "json",
+ sink: (line) => lines.push(line),
+ baseFields: { request_id: "rid-1" },
+ });
+ logger.info("http.request", { method: "GET", status: 200 });
+ expect(lines).toHaveLength(1);
+ const parsed = JSON.parse(lines[0] ?? "");
+ expect(parsed.event).toBe("http.request");
+ expect(parsed.component).toBe("agentprobe.server");
+ expect(parsed.request_id).toBe("rid-1");
+ expect(parsed.method).toBe("GET");
+ expect(parsed.status).toBe(200);
+ expect(parsed.level).toBe("info");
+ });
+
+ test("text format emits component, level, event prefix", () => {
+ const lines: string[] = [];
+ const logger = createLogger({
+ component: "agentprobe.run",
+ format: "text",
+ sink: (line) => lines.push(line),
+ });
+ logger.error("run.error", { run_id: "abc" });
+ expect(lines[0]).toContain("[agentprobe.run]");
+ expect(lines[0]).toContain("error");
+ expect(lines[0]).toContain("run.error");
+ expect(lines[0]).toContain("run_id=abc");
+ });
+
+ test("child loggers inherit base fields and override component", () => {
+ const lines: string[] = [];
+ const parent = createLogger({
+ component: "agentprobe.server",
+ format: "json",
+ sink: (line) => lines.push(line),
+ baseFields: { request_id: "rid-1" },
+ });
+ const child = parent.child("agentprobe.run", { run_id: "abc" });
+ child.warn("run.slow", { duration_ms: 1500 });
+ const parsed = JSON.parse(lines[0] ?? "");
+ expect(parsed.component).toBe("agentprobe.run");
+ expect(parsed.request_id).toBe("rid-1");
+ expect(parsed.run_id).toBe("abc");
+ expect(parsed.duration_ms).toBe(1500);
+ expect(parsed.level).toBe("warn");
+ });
+});
diff --git a/tests/unit/server/observability/metrics.test.ts b/tests/unit/server/observability/metrics.test.ts
new file mode 100644
index 0000000..0e26e5c
--- /dev/null
+++ b/tests/unit/server/observability/metrics.test.ts
@@ -0,0 +1,73 @@
+import { describe, expect, test } from "bun:test";
+
+import {
+ MetricsRegistry,
+ SERVER_METRIC_NAMES,
+} from "../../../../src/runtime/server/observability/metrics.ts";
+
+describe("MetricsRegistry", () => {
+ test("increments counters and groups by labels", () => {
+ const metrics = new MetricsRegistry();
+ metrics.incrementCounter(SERVER_METRIC_NAMES.httpRequests, 1, {
+ method: "GET",
+ route: "/api/runs",
+ status: 200,
+ });
+ metrics.incrementCounter(SERVER_METRIC_NAMES.httpRequests, 1, {
+ method: "GET",
+ route: "/api/runs",
+ status: 200,
+ });
+ metrics.incrementCounter(SERVER_METRIC_NAMES.httpRequests, 1, {
+ method: "POST",
+ route: "/api/runs",
+ status: 202,
+ });
+
+ expect(
+ metrics.getCounter(SERVER_METRIC_NAMES.httpRequests, {
+ method: "GET",
+ route: "/api/runs",
+ status: 200,
+ }),
+ ).toBe(2);
+ const snapshot = metrics.snapshot();
+ const requestCounters = snapshot.counters.filter(
+ (entry) => entry.name === SERVER_METRIC_NAMES.httpRequests,
+ );
+ expect(requestCounters).toHaveLength(2);
+ });
+
+ test("gauges track active and total separately", () => {
+ const metrics = new MetricsRegistry();
+ metrics.adjustGauge(SERVER_METRIC_NAMES.runsActive, 1);
+ metrics.adjustGauge(SERVER_METRIC_NAMES.runsActive, 1);
+ metrics.adjustGauge(SERVER_METRIC_NAMES.runsActive, -1);
+ metrics.incrementCounter(SERVER_METRIC_NAMES.runsStartedTotal, 1);
+ metrics.incrementCounter(SERVER_METRIC_NAMES.runsFinishedTotal, 1);
+
+ expect(metrics.getGauge(SERVER_METRIC_NAMES.runsActive)).toBe(1);
+ expect(metrics.getCounter(SERVER_METRIC_NAMES.runsStartedTotal)).toBe(1);
+ expect(metrics.getCounter(SERVER_METRIC_NAMES.runsFinishedTotal)).toBe(1);
+ });
+
+ test("snapshot is sorted by name", () => {
+ const metrics = new MetricsRegistry();
+ metrics.incrementCounter("z.alpha", 1);
+ metrics.incrementCounter("a.alpha", 1);
+ const snapshot = metrics.snapshot();
+ expect(snapshot.counters.map((entry) => entry.name)).toEqual([
+ "a.alpha",
+ "z.alpha",
+ ]);
+ });
+
+ test("reset clears all state", () => {
+ const metrics = new MetricsRegistry();
+ metrics.incrementCounter("c", 5);
+ metrics.setGauge("g", 3);
+ metrics.reset();
+ expect(metrics.snapshot().counters).toEqual([]);
+ expect(metrics.snapshot().gauges).toEqual([]);
+ });
+});
diff --git a/tests/unit/server/observability/redaction.test.ts b/tests/unit/server/observability/redaction.test.ts
new file mode 100644
index 0000000..abdb0f7
--- /dev/null
+++ b/tests/unit/server/observability/redaction.test.ts
@@ -0,0 +1,54 @@
+import { describe, expect, test } from "bun:test";
+
+import type { ServerConfig } from "../../../../src/runtime/server/config.ts";
+import {
+ isSecretKey,
+ redactRecord,
+ redactSecretValue,
+ summarizeServerConfig,
+} from "../../../../src/runtime/server/observability/redaction.ts";
+
+describe("redaction", () => {
+ test("redactSecretValue masks long secrets and keeps length signal", () => {
+ expect(redactSecretValue("abcd1234")).toBe("[redacted]:8c");
+ expect(redactSecretValue("ab")).toBe("[redacted]");
+ expect(redactSecretValue(undefined)).toBe("");
+ expect(redactSecretValue(null)).toBe("");
+ });
+
+ test("isSecretKey covers common credential patterns", () => {
+ expect(isSecretKey("token")).toBeTrue();
+ expect(isSecretKey("api_key")).toBeTrue();
+ expect(isSecretKey("Authorization")).toBeTrue();
+ expect(isSecretKey("port")).toBeFalse();
+ });
+
+ test("redactRecord redacts nested secret keys", () => {
+ const out = redactRecord({
+ host: "127.0.0.1",
+ auth: { token: "supersecretvalue", scope: "read" },
+ });
+ expect(out.host).toBe("127.0.0.1");
+ const auth = out.auth as Record;
+ expect(auth.token).toBe("[redacted]:16c");
+ expect(auth.scope).toBe("read");
+ });
+
+ test("summarizeServerConfig redacts token and db url", () => {
+ const config: ServerConfig = {
+ host: "127.0.0.1",
+ port: 7878,
+ dataPath: "/tmp/data",
+ dbUrl: "postgres://user:secretpw@db.example/agentprobe",
+ dashboardDist: undefined,
+ token: "tok_super_long_value",
+ corsOrigins: [],
+ unsafeExpose: false,
+ openBrowser: false,
+ logFormat: "json",
+ };
+ const summary = summarizeServerConfig(config);
+ expect(summary.token).toBe("[redacted]:20c");
+ expect(String(summary.db_url)).not.toContain("secretpw");
+ });
+});
diff --git a/tests/unit/server/observability/spans.test.ts b/tests/unit/server/observability/spans.test.ts
new file mode 100644
index 0000000..568eaa3
--- /dev/null
+++ b/tests/unit/server/observability/spans.test.ts
@@ -0,0 +1,54 @@
+import { describe, expect, test } from "bun:test";
+
+import {
+ SERVER_SPAN_NAMES,
+ SpanRecorder,
+} from "../../../../src/runtime/server/observability/spans.ts";
+
+describe("SpanRecorder", () => {
+ test("records duration and ok status for synchronous work", () => {
+ const recorder = new SpanRecorder();
+ const scope = recorder.start(SERVER_SPAN_NAMES.runStartValidation, {
+ preset_id: "preset-1",
+ });
+ scope.setAttribute("note", "ok");
+ scope.setStatus("ok");
+ scope.end();
+ const records = recorder.snapshot();
+ expect(records).toHaveLength(1);
+ const [record] = records;
+ expect(record).toBeDefined();
+ if (!record) return;
+ expect(record.name).toBe(SERVER_SPAN_NAMES.runStartValidation);
+ expect(record.status).toBe("ok");
+ expect(record.attributes).toMatchObject({
+ preset_id: "preset-1",
+ note: "ok",
+ });
+ expect(record.durationMs).toBeGreaterThanOrEqual(0);
+ });
+
+ test("withSpan captures errors and rethrows", async () => {
+ const recorder = new SpanRecorder();
+ await expect(
+ recorder.withSpan("server.test", {}, () => {
+ throw new Error("boom");
+ }),
+ ).rejects.toThrow("boom");
+ const records = recorder.snapshot();
+ expect(records).toHaveLength(1);
+ const [record] = records;
+ expect(record).toBeDefined();
+ if (!record) return;
+ expect(record.status).toBe("error");
+ expect(record.error?.message).toBe("boom");
+ });
+
+ test("end is idempotent", () => {
+ const recorder = new SpanRecorder();
+ const scope = recorder.start("s");
+ scope.end();
+ scope.end();
+ expect(recorder.snapshot()).toHaveLength(1);
+ });
+});
|