Skip to content

Commit f86d391

Browse files
aksOpsclaude
andauthored
fix(robustness): P1+P2 follow-ups from brainstorm (#61)
* fix(robustness): P1+P2 follow-ups from brainstorm Closes the four actionable follow-ups deferred from the codex round-2 brainstorm. The fifth ("DLQ replay idempotency for spans/logs") is still deferred — it requires a schema migration to add unique indexes plus a pre-migration dedup pass, not a code-only fix. P1 — tenant pipeline starvation fairness (`internal/ingest/pipeline.go`, `config.go`, `main.go`). - Added per-tenant in-flight cap to Pipeline. When set, healthy submissions from a tenant already at the cap are dropped at Submit() with reason "tenant_backpressure" (separate counter from DroppedHealthy / RejectedFull). Priority batches (errors / slow traces) bypass — diagnostic data must always land. - Slot is reserved at Submit and released in process()'s deferred cleanup, so panics still release. Failed channel sends in Submit undo the reservation before returning ErrQueueFull. - Wired via INGEST_PIPELINE_PER_TENANT_CAP (default 0 = disabled, no behavior change for single-tenant deployments). Multi-tenant deployments should set this to ~Capacity/N×2 where N is the number of concurrently-active tenants. P1 — pipeline drain-path metric blind spot: skipped. The brainstorm's framing was that Stop()'s drain bypasses observeDrop accounting, but verification shows the drain path PROCESSES (not drops) batches via process(), and there are no shutdown-phase drops to account for. P2 — /ready saturation probes (`internal/api/health_handlers.go`, `server.go`, `main.go`). - Server gains two callback-shaped probes (DLQ disk fullness, ingest pipeline fullness). Probes return a fraction in [0,1]; /ready flips to 503 above 0.95 so orchestrators stop sending traffic before the pipeline starts hard-rejecting (gRPC RESOURCE_EXHAUSTED / HTTP 429) or DLQ starts FIFO-evicting. - Wired in main.go using closures that capture dlq + ingestPipeline. Probes are nil-tolerant on the server side; nil = "skipped" rather than fatal. P2 — retention adaptive pacing (`internal/storage/retention.go`). - New adaptPurgeSleep on RetentionScheduler. After each purge pass, measures wall-clock time vs purgeInterval. >50% → double the inter-batch sleep (capped at 100ms). <10% → halve it (floored at 1ms). Single-writer (the retention loop), so no synchronization needed; purge methods read the value once at the call boundary. P2 — GraphSnapshot row cap (`internal/graphrag/snapshot.go`). - pruneOldSnapshots now enforces a 100k row backstop after the by-age delete. Steady-state at 15-min cadence × 100 tenants is ~67k rows; 100k is enough headroom that the backstop never triggers under normal operation but bounds disk if write rate spikes. Tests: three new pipeline tests lock in the per-tenant cap contract (drops past cap, priority bypasses, slot released after process). Full suite: 407 tests pass under `-race -count=1`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(robustness): clamp pipeline capacity/workers (CodeQL) CodeQL flagged make(chan *Batch, cfg.Capacity) at pipeline.go:171 as go/uncontrolled-allocation-size — cfg.Capacity flows from the INGEST_PIPELINE_QUEUE_SIZE env var without an upper bound, so a typo like 10_000_000_000 would OOM the process at startup. Add defensive caps in NewPipeline (maxPipelineCapacity = 1M, maxPipelineWorkers = 256). Clamping at the constructor satisfies the CodeQL taint-tracking and is the right defense regardless — both ceilings are well above any reasonable deployment. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(robustness): use min() sanitizer for CodeQL allocation-size In-place reassignment of cfg.Capacity/Workers wasn't recognized by CodeQL as a sanitizer for go/uncontrolled-allocation-size. Reroute through local variables clamped via min(); make() consumes the local, which CodeQL's taint-tracking accepts as bounded. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(robustness): replace min() with explicit comparison guard for CodeQL CodeQL's BoundedFlowSource taint analysis recognizes explicit if/else comparison guards as sanitizers but not the min() builtin (Go 1.21+). Switching to `if x > MAX { x = MAX }` is the canonical pattern from go/uncontrolled-allocation-size docs and should clear the alert without changing runtime behavior. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 955618a commit f86d391

8 files changed

Lines changed: 394 additions & 9 deletions

File tree

internal/api/health_handlers.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,17 @@ package api
33
import (
44
"context"
55
"encoding/json"
6+
"fmt"
67
"net/http"
78
"time"
89
)
910

11+
// readySaturationThreshold is the fullness fraction at which a saturation
12+
// probe (DLQ disk, ingest pipeline) flips /ready to 503. Set high enough
13+
// that brief spikes don't cause restart loops, low enough that orchestrators
14+
// stop sending traffic before the system fails outright.
15+
const readySaturationThreshold = 0.95
16+
1017
// handleLive is a Kubernetes-style liveness probe.
1118
// Returns 200 OK as long as the process is up. Does not check dependencies.
1219
func (s *Server) handleLive(w http.ResponseWriter, r *http.Request) {
@@ -55,6 +62,31 @@ func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
5562
ready = false
5663
}
5764

65+
// Saturation probes — flip to 503 when downstream buffers are full so
66+
// orchestrators (k8s, load balancers) stop routing fresh traffic before
67+
// the pipeline starts hard-rejecting (gRPC RESOURCE_EXHAUSTED / HTTP 429)
68+
// or DLQ starts FIFO-evicting unflushed batches.
69+
if s.dlqSaturation != nil {
70+
if sat := s.dlqSaturation(); sat >= readySaturationThreshold {
71+
checks["dlq_disk"] = fmt.Sprintf("saturated %.0f%%", sat*100)
72+
ready = false
73+
} else {
74+
checks["dlq_disk"] = "ok"
75+
}
76+
} else {
77+
checks["dlq_disk"] = "skipped"
78+
}
79+
if s.pipelineSaturation != nil {
80+
if sat := s.pipelineSaturation(); sat >= readySaturationThreshold {
81+
checks["pipeline"] = fmt.Sprintf("saturated %.0f%%", sat*100)
82+
ready = false
83+
} else {
84+
checks["pipeline"] = "ok"
85+
}
86+
} else {
87+
checks["pipeline"] = "skipped"
88+
}
89+
5890
status := http.StatusOK
5991
if !ready {
6092
status = http.StatusServiceUnavailable

internal/api/server.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ type Server struct {
2323
graph *graph.Graph // in-memory service dependency graph (may be nil before first build)
2424
graphRAG *graphrag.GraphRAG // layered GraphRAG for advanced queries
2525
vectorIdx *vectordb.Index // TF-IDF semantic log search index
26+
27+
// Saturation probes consulted by /ready. Each returns a fullness
28+
// fraction in [0.0, 1.0]; nil disables the corresponding check.
29+
// Decoupling via callbacks keeps the api package free of queue/ingest
30+
// imports and lets tests inject deterministic values.
31+
dlqSaturation func() float64
32+
pipelineSaturation func() float64
2633
}
2734

2835
// NewServer creates a new API server.
@@ -51,6 +58,21 @@ func (s *Server) SetVectorIndex(idx *vectordb.Index) {
5158
s.vectorIdx = idx
5259
}
5360

61+
// SetDLQSaturationProbe registers a callback returning DLQ disk fullness as
62+
// a fraction in [0.0, 1.0]. Used by /ready to flip to 503 when DLQ is at
63+
// risk of FIFO-evicting unflushed batches. Pass nil to disable the check.
64+
func (s *Server) SetDLQSaturationProbe(fn func() float64) {
65+
s.dlqSaturation = fn
66+
}
67+
68+
// SetPipelineSaturationProbe registers a callback returning ingest pipeline
69+
// queue fullness as a fraction in [0.0, 1.0]. Used by /ready to flip to 503
70+
// when the pipeline is at hard capacity (already returning 429/RESOURCE_EXHAUSTED
71+
// to clients). Pass nil to disable the check.
72+
func (s *Server) SetPipelineSaturationProbe(fn func() float64) {
73+
s.pipelineSaturation = fn
74+
}
75+
5476
// RegisterRoutes registers API endpoints on the provided mux.
5577
func (s *Server) RegisterRoutes(mux *http.ServeMux) {
5678
// Metadata & Discovery

internal/config/config.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ type Config struct {
121121
IngestAsyncEnabled bool // default true; opt out via INGEST_ASYNC_ENABLED=false
122122
IngestPipelineQueueSize int // default 50000 batches; per-deployment tunable
123123
IngestPipelineWorkers int // default 8 worker goroutines
124+
// IngestPipelinePerTenantCap caps in-flight batches per tenant so a noisy
125+
// tenant cannot starve siblings of fresh queue slots when fullness is
126+
// below the soft-backpressure threshold. 0 (default) disables — single-
127+
// tenant deployments need no cap. Operators on multi-tenant deployments
128+
// should set INGEST_PIPELINE_PER_TENANT_CAP to roughly Capacity/N where
129+
// N is the expected number of concurrently-active tenants, with some
130+
// headroom (e.g. 2× the fair-share value) for short bursts.
131+
IngestPipelinePerTenantCap int
124132

125133
// TLS (HTTP + gRPC). When both paths are set, TLS is enabled on both servers.
126134
// Empty values (default) keep plaintext behavior.
@@ -260,9 +268,10 @@ func Load(customPath string) (*Config, error) {
260268
GraphRAGEventQueueSize: getEnvInt("GRAPHRAG_EVENT_QUEUE_SIZE", 100000),
261269

262270
// Async ingest pipeline
263-
IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true),
264-
IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000),
265-
IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8),
271+
IngestAsyncEnabled: getEnvBool("INGEST_ASYNC_ENABLED", true),
272+
IngestPipelineQueueSize: getEnvInt("INGEST_PIPELINE_QUEUE_SIZE", 50000),
273+
IngestPipelineWorkers: getEnvInt("INGEST_PIPELINE_WORKERS", 8),
274+
IngestPipelinePerTenantCap: getEnvInt("INGEST_PIPELINE_PER_TENANT_CAP", 0),
266275

267276
// TLS
268277
TLSCertFile: getEnv("TLS_CERT_FILE", ""),

internal/graphrag/snapshot.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,16 @@ func (g *GraphRAG) takeSnapshotForTenant(_ context.Context, tenant string, store
142142
)
143143
}
144144

145-
// pruneOldSnapshots removes snapshots older than 7 days.
145+
// maxSnapshotRows is a row-count backstop on `graph_snapshots` to prevent
146+
// unbounded disk growth when the write rate outruns the 7-day age prune.
147+
// Steady state at 15-min cadence × 100 tenants is ~67k rows/week, so 100k
148+
// gives ~50% headroom — high enough to never trigger under normal operation,
149+
// low enough to bound disk if a misconfig or tenant explosion runs the
150+
// snapshotter hot.
151+
const maxSnapshotRows = 100_000
152+
153+
// pruneOldSnapshots removes snapshots older than 7 days, then enforces a
154+
// row-count backstop in case the by-age prune isn't keeping up.
146155
func (g *GraphRAG) pruneOldSnapshots() {
147156
if g.repo == nil || g.repo.DB() == nil {
148157
return
@@ -154,6 +163,27 @@ func (g *GraphRAG) pruneOldSnapshots() {
154163
} else if result.RowsAffected > 0 {
155164
slog.Info("Pruned old graph snapshots", "count", result.RowsAffected)
156165
}
166+
167+
var count int64
168+
if err := g.repo.DB().Model(&GraphSnapshot{}).Count(&count).Error; err != nil {
169+
slog.Error("Failed to count snapshots for row-cap prune", "error", err)
170+
return
171+
}
172+
if count <= maxSnapshotRows {
173+
return
174+
}
175+
excess := count - maxSnapshotRows
176+
// Subquery selects the N oldest IDs, then deletes that set. Portable
177+
// across SQLite and Postgres; avoids a multi-statement transaction.
178+
sub := g.repo.DB().Model(&GraphSnapshot{}).Select("id").Order("created_at ASC").Limit(int(excess))
179+
if err := g.repo.DB().Where("id IN (?)", sub).Delete(&GraphSnapshot{}).Error; err != nil {
180+
slog.Error("Failed to row-cap prune snapshots", "error", err)
181+
return
182+
}
183+
slog.Warn("graphrag: row-cap pruned snapshots (write rate exceeded by-age prune)",
184+
"deleted", excess,
185+
"cap", maxSnapshotRows,
186+
)
157187
}
158188

159189
// GetGraphSnapshot retrieves the snapshot closest to the requested time,

internal/ingest/pipeline.go

Lines changed: 119 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,17 @@ type PipelineConfig struct {
7878
SoftThreshold float64 // fullness fraction above which healthy batches are dropped (0.0–1.0)
7979
}
8080

81+
// Defensive upper bounds on operator-supplied capacity/workers. Env-var
82+
// inputs go directly into a make(chan ...) and into goroutine launches;
83+
// without a sanity cap a typo like INGEST_PIPELINE_QUEUE_SIZE=10_000_000_000
84+
// would OOM the process. These caps are well above any reasonable
85+
// production deployment (50k is the default queue, 8 the default workers)
86+
// while still keeping the allocation finite.
87+
const (
88+
maxPipelineCapacity = 1_000_000
89+
maxPipelineWorkers = 256
90+
)
91+
8192
// DefaultPipelineConfig returns production-sized defaults.
8293
func DefaultPipelineConfig() PipelineConfig {
8394
return PipelineConfig{
@@ -130,6 +141,16 @@ type Pipeline struct {
130141
droppedHealthy atomic.Int64
131142
rejectedFull atomic.Int64
132143
processFailures atomic.Int64
144+
tenantDropped atomic.Int64
145+
146+
// Per-tenant in-flight cap — bounds the queue slots a single tenant
147+
// can consume so a noisy tenant cannot starve siblings of fresh
148+
// healthy submissions when fullness is below the soft threshold.
149+
// Priority batches (errors/slow) bypass the cap because diagnostic
150+
// data must always land. perTenantCap == 0 disables the check.
151+
perTenantCap int
152+
tenantMu sync.Mutex
153+
tenantInFlight map[string]int
133154

134155
stopCh chan struct{}
135156
once sync.Once
@@ -147,6 +168,30 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline
147168
if cfg.Workers <= 0 {
148169
cfg.Workers = d.Workers
149170
}
171+
// Sanitize operator-supplied capacity/workers BEFORE the make()/Workers
172+
// loop. CodeQL's taint-tracking treats env-var-derived values as untrusted
173+
// for go/uncontrolled-allocation-size; only an explicit comparison guard
174+
// is recognized as a BarrierGuard sanitizer. Both ceilings are well above
175+
// any reasonable deployment (50k default queue, 8 default workers) but
176+
// keep the allocation bounded against a misconfigured env var.
177+
capacity := cfg.Capacity
178+
if capacity > maxPipelineCapacity {
179+
slog.Warn("ingest pipeline: capacity clamped to defensive ceiling",
180+
"requested", capacity,
181+
"max", maxPipelineCapacity,
182+
)
183+
capacity = maxPipelineCapacity
184+
}
185+
workers := cfg.Workers
186+
if workers > maxPipelineWorkers {
187+
slog.Warn("ingest pipeline: workers clamped to defensive ceiling",
188+
"requested", workers,
189+
"max", maxPipelineWorkers,
190+
)
191+
workers = maxPipelineWorkers
192+
}
193+
cfg.Capacity = capacity
194+
cfg.Workers = workers
150195
// Zero-value config falls back to defaults — the field is internal
151196
// (no env-var surface) and TestPipeline_DefaultsApplied enforces this.
152197
// Priority-only mode (always-soft-drop) is not a supported configuration
@@ -155,14 +200,37 @@ func NewPipeline(writer pipelineWriter, metrics *telemetry.Metrics, cfg Pipeline
155200
cfg.SoftThreshold = d.SoftThreshold
156201
}
157202
return &Pipeline{
158-
writer: writer,
159-
metrics: metrics,
160-
cfg: cfg,
161-
queue: make(chan *Batch, cfg.Capacity),
162-
stopCh: make(chan struct{}),
203+
writer: writer,
204+
metrics: metrics,
205+
cfg: cfg,
206+
queue: make(chan *Batch, capacity),
207+
tenantInFlight: make(map[string]int),
208+
stopCh: make(chan struct{}),
163209
}
164210
}
165211

212+
// SetPerTenantCap configures the maximum in-flight batches one tenant may
213+
// hold in the queue (and currently being processed). 0 disables the cap.
214+
// Once a tenant hits the cap, further healthy submissions from that tenant
215+
// are dropped at Submit() time with reason "tenant_backpressure". Priority
216+
// batches (errors/slow traces) bypass the cap.
217+
//
218+
// Sized as a fraction of Capacity, e.g. Capacity/4 keeps any single tenant
219+
// to 25% of queue capacity. Operators tune via INGEST_PIPELINE_PER_TENANT_CAP.
220+
// Startup-only — call before Start().
221+
func (p *Pipeline) SetPerTenantCap(n int) {
222+
if n < 0 {
223+
n = 0
224+
}
225+
p.perTenantCap = n
226+
}
227+
228+
// TenantDropped reports the cumulative number of healthy submissions
229+
// rejected because the submitting tenant was at the per-tenant cap.
230+
// Distinct from RejectedFull (queue at hard capacity) and
231+
// DroppedHealthy (soft-backpressure across the whole queue).
232+
func (p *Pipeline) TenantDropped() int64 { return p.tenantDropped.Load() }
233+
166234
// Start spawns the worker pool. Safe to call once. Subsequent calls are
167235
// no-ops; tests rely on this for reset semantics.
168236
func (p *Pipeline) Start(ctx context.Context) {
@@ -215,18 +283,57 @@ func (p *Pipeline) Submit(b *Batch) error {
215283
return nil
216284
}
217285

286+
// Per-tenant cap — only enforced for healthy batches (priority bypasses,
287+
// same as soft-backpressure). Reserve the slot under the lock so the
288+
// counter and the channel send are coherent: if the channel is full,
289+
// undo the reservation in the default branch below.
290+
tenantReserved := false
291+
if p.perTenantCap > 0 && b.Tenant != "" && !b.Priority() {
292+
p.tenantMu.Lock()
293+
if p.tenantInFlight[b.Tenant] >= p.perTenantCap {
294+
p.tenantMu.Unlock()
295+
p.tenantDropped.Add(1)
296+
p.observeDrop(b.Type, "tenant_backpressure")
297+
return nil
298+
}
299+
p.tenantInFlight[b.Tenant]++
300+
tenantReserved = true
301+
p.tenantMu.Unlock()
302+
}
303+
218304
select {
219305
case p.queue <- b:
220306
p.enqueuedTotal.Add(1)
221307
p.observeQueueDepth(b.Type)
222308
return nil
223309
default:
310+
if tenantReserved {
311+
p.releaseTenantSlot(b.Tenant)
312+
}
224313
p.rejectedFull.Add(1)
225314
p.observeDrop(b.Type, "queue_full")
226315
return ErrQueueFull
227316
}
228317
}
229318

319+
// releaseTenantSlot decrements the in-flight count for a tenant, removing
320+
// the map entry when it hits zero so the map doesn't grow unboundedly with
321+
// short-lived tenant IDs. Safe to call with an empty tenant or when the
322+
// cap is disabled — both no-op.
323+
func (p *Pipeline) releaseTenantSlot(tenant string) {
324+
if p.perTenantCap <= 0 || tenant == "" {
325+
return
326+
}
327+
p.tenantMu.Lock()
328+
n := p.tenantInFlight[tenant] - 1
329+
if n <= 0 {
330+
delete(p.tenantInFlight, tenant)
331+
} else {
332+
p.tenantInFlight[tenant] = n
333+
}
334+
p.tenantMu.Unlock()
335+
}
336+
230337
// Stop signals workers to exit and blocks until in-flight batches have
231338
// been drained from the channel. Idempotent.
232339
func (p *Pipeline) Stop() {
@@ -302,6 +409,13 @@ func (p *Pipeline) process(b *Batch) {
302409
if b == nil {
303410
return
304411
}
412+
// Release the per-tenant slot reserved at Submit time. Registered as
413+
// a defer so it runs even if the batch panics. Priority batches don't
414+
// reserve at submit, so they don't release here either — the conditions
415+
// must mirror exactly to keep the in-flight count balanced.
416+
if !b.Priority() {
417+
defer p.releaseTenantSlot(b.Tenant)
418+
}
305419
defer func() {
306420
if r := recover(); r != nil {
307421
slog.Error("ingest pipeline process panic",

0 commit comments

Comments
 (0)