Skip to content

Commit 17d9e66

Browse files
aksOpsclaude
andauthored
feat(telemetry): add per-signal ingest E2E latency histogram (#63)
* checkpoint: pre-yolo 2026-04-28T07:25:17 * feat(telemetry): add per-signal ingest E2E latency histogram Closes the production-readiness gap where there was no way to set or alert on a p99 ingest SLO — IngestionRate was a counter only, and GRPCRequestDuration covered the gRPC layer broadly without a per-signal split. Adds otelcontext_ingest_duration_seconds{signal} histogram observed via defer time.Since(start) in TraceServer/LogsServer/MetricsServer.Export. The HTTP OTLP handler delegates to the same Export methods so both transports record uniformly with a single instrumentation site. - New label "signal" ∈ {traces, logs, metrics} for per-signal SLO alerting - Buckets cover 1ms..10s (typical OTLP ingest range) - Nil-safe ObserveIngestDuration helper protects ingest tests that pass nil telemetry.Metrics - Subtests in metrics_pool_test.go assert per-label count advances and nil-receiver safety Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 83fdbf7 commit 17d9e66

4 files changed

Lines changed: 66 additions & 1 deletion

File tree

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<claude-mem-context>
22
# Memory Context
33

4-
# [otelcontext] recent context, 2026-04-28 1:14am UTC
4+
# [otelcontext] recent context, 2026-04-28 6:43am UTC
55

66
No previous sessions found.
77
</claude-mem-context>

internal/ingest/otlp.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ func (s *MetricsServer) SetMetricCallback(cb func(tsdb.RawMetric)) {
213213

214214
// Export handles incoming OTLP metrics data.
215215
func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error) {
216+
start := time.Now()
217+
defer func() { s.metrics.ObserveIngestDuration("metrics", time.Since(start)) }()
216218
for _, resourceMetrics := range req.ResourceMetrics {
217219
serviceName := getServiceName(resourceMetrics.Resource.Attributes)
218220

@@ -283,6 +285,8 @@ func (s *MetricsServer) Export(ctx context.Context, req *colmetricspb.ExportMetr
283285

284286
// Export handles incoming OTLP trace data.
285287
func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
288+
start := time.Now()
289+
defer func() { s.metrics.ObserveIngestDuration("traces", time.Since(start)) }()
286290
slog.Debug("📥 [TRACES] Received Request", "resource_spans", len(req.ResourceSpans))
287291

288292
type batchResult struct {
@@ -543,6 +547,8 @@ func (s *TraceServer) Export(ctx context.Context, req *coltracepb.ExportTraceSer
543547

544548
// Export handles incoming OTLP log data.
545549
func (s *LogsServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) {
550+
start := time.Now()
551+
defer func() { s.metrics.ObserveIngestDuration("logs", time.Since(start)) }()
546552
// slog.Debug("📥 [LOGS] Received Request", "resource_logs", len(req.ResourceLogs))
547553

548554
logResults := make([][]storage.Log, len(req.ResourceLogs))

internal/telemetry/metrics.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ type Metrics struct {
2828
DBLatency prometheus.Histogram
2929
DLQSize prometheus.Gauge
3030

31+
// IngestDurationSeconds is the per-Export E2E latency observed inside
32+
// the OTLP servers (gRPC + HTTP), labeled by signal {traces,logs,metrics}.
33+
// Drives ingest SLOs: alert on p99 / error budget burn rather than on the
34+
// blunt OtelContext_grpc_request_duration_seconds aggregate.
35+
IngestDurationSeconds *prometheus.HistogramVec
36+
3137
// --- gRPC ---
3238
GRPCRequestsTotal *prometheus.CounterVec
3339
GRPCRequestDuration *prometheus.HistogramVec
@@ -153,6 +159,12 @@ func New() *Metrics {
153159
Help: "Number of files currently in the Dead Letter Queue.",
154160
}),
155161

162+
IngestDurationSeconds: promauto.NewHistogramVec(prometheus.HistogramOpts{
163+
Name: "otelcontext_ingest_duration_seconds",
164+
Help: "End-to-end OTLP Export latency observed in the ingest server, by signal.",
165+
Buckets: []float64{.001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
166+
}, []string{"signal"}),
167+
156168
// gRPC
157169
GRPCRequestsTotal: promauto.NewCounterVec(prometheus.CounterOpts{
158170
Name: "OtelContext_grpc_requests_total",
@@ -401,6 +413,17 @@ func (m *Metrics) RecordIngestion(count int) {
401413
m.totalIngested.Add(int64(count))
402414
}
403415

416+
// ObserveIngestDuration records an end-to-end OTLP Export latency for the
417+
// given signal. Callers should pass time.Since(start) measured from the very
418+
// start of the Export handler. Nil-safe so the OTLP servers can be wired
419+
// without a Metrics instance during tests.
420+
func (m *Metrics) ObserveIngestDuration(signal string, d time.Duration) {
421+
if m == nil || m.IngestDurationSeconds == nil {
422+
return
423+
}
424+
m.IngestDurationSeconds.WithLabelValues(signal).Observe(d.Seconds())
425+
}
426+
404427
func (m *Metrics) SetActiveConnections(n int) {
405428
m.ActiveConnections.Set(float64(n))
406429
m.activeConns.Store(int64(n))

internal/telemetry/metrics_pool_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package telemetry
33
import (
44
"database/sql"
55
"testing"
6+
"time"
67

78
_ "github.com/glebarez/go-sqlite" // registers "sqlite" driver used by glebarez/sqlite GORM dialect
89
"github.com/prometheus/client_golang/prometheus"
@@ -65,4 +66,39 @@ func TestSampleDBPoolStats(t *testing.T) {
6566
var m2 *Metrics
6667
m2.SampleDBPoolStats(nil)
6768
})
69+
70+
t.Run("ObserveIngestDuration_RecordsByLabel", func(t *testing.T) {
71+
// Observe a duration for each signal and assert the histogram count
72+
// increases for the matching label only — verifies the per-signal
73+
// label split is wired correctly.
74+
for _, signal := range []string{"traces", "logs", "metrics"} {
75+
before := histCountForTest(t, m.IngestDurationSeconds, signal)
76+
m.ObserveIngestDuration(signal, 25*time.Millisecond)
77+
after := histCountForTest(t, m.IngestDurationSeconds, signal)
78+
if after != before+1 {
79+
t.Fatalf("signal=%s: count did not advance: before=%d after=%d", signal, before, after)
80+
}
81+
}
82+
})
83+
84+
t.Run("ObserveIngestDuration_NilSafe", func(t *testing.T) {
85+
// nil receiver must not panic — protects ingest tests that pass nil
86+
// telemetry.Metrics through to the OTLP servers.
87+
var m2 *Metrics
88+
m2.ObserveIngestDuration("traces", time.Millisecond)
89+
})
90+
}
91+
92+
// histCountForTest scrapes the cumulative count of a labeled histogram.
93+
func histCountForTest(t *testing.T, h *prometheus.HistogramVec, label string) uint64 {
94+
t.Helper()
95+
hist, err := h.GetMetricWithLabelValues(label)
96+
if err != nil {
97+
t.Fatalf("GetMetricWithLabelValues(%q): %v", label, err)
98+
}
99+
var dm dto.Metric
100+
if err := hist.(prometheus.Metric).Write(&dm); err != nil {
101+
t.Fatalf("histogram write: %v", err)
102+
}
103+
return dm.GetHistogram().GetSampleCount()
68104
}

0 commit comments

Comments
 (0)