Skip to content

Commit e4631a4

Browse files
aksOpsclaude
andauthored
feat(ingest): add STORE_MIN_SEVERITY second-tier severity gate (#77)
Adds a second log-severity threshold that runs at the persist boundary inside the async ingest pipeline. Logs above INGEST_MIN_SEVERITY but below STORE_MIN_SEVERITY are dropped from the BatchCreateAll write but still flow through LogCallback so in-memory enrichers (vectordb, GraphRAG Drain template mining, span/trace correlation) keep seeing them. Use case: keep SQLite small while letting in-memory anomaly detection benefit from the verbose stream. Example: INGEST_MIN_SEVERITY=DEBUG STORE_MIN_SEVERITY=WARN Default behavior is unchanged — empty STORE_MIN_SEVERITY means "use the ingest threshold", so the gate is a no-op out of the box. Setting the store threshold ≤ ingest threshold is also a no-op (logged as warning). Implementation - internal/config/config.go: add StoreMinSeverity field + STORE_MIN_SEVERITY env - internal/ingest/pipeline.go: storeMinSeverity field, SetStoreMinSeverity setter, filter inside process() that splits b.Logs into persisted vs callback-only, atomic StoreFiltered counter, exposed via PipelineStats - internal/ingest/otlp.go: export ParseSeverity wrapper for main.go wiring - main.go: parse both thresholds, only enable the gate when store > ingest, warn on misconfig - CLAUDE.md: document both thresholds with use-case examples Tests - TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist: WARN gate drops INFO from persist but INFO still reaches LogCallback; StoreFiltered=1 - TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs: legacy path unchanged when SetStoreMinSeverity not called Verification - go vet ./... clean - go build ./... clean - go test -race -timeout 180s ./... — 518 passed in 28 packages (+2 new) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0c92a4a commit e4631a4

6 files changed

Lines changed: 212 additions & 2 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ Key settings in `internal/config/config.go`:
217217
- `LOG_FTS_ENABLED` (false) — when truthy (`true`/`yes`/`on`/`1`), provisions the SQLite FTS5 `logs_fts` virtual table + sync triggers at startup; when false, log-search uses vectordb (semantic) plus a 24h-clamped LIKE fallback. Toggle off and reclaim disk via `POST /api/admin/drop_fts` (refused while the flag is on).
218218
- `DLQ_MAX_FILES` (1000), `DLQ_MAX_DISK_MB` (500), `DLQ_MAX_RETRIES` (10)
219219
- `GRAPHRAG_WORKER_COUNT` (16), `GRAPHRAG_EVENT_QUEUE_SIZE` (100000) — sized for 100–200 services; raise further if `otelcontext_graphrag_events_dropped_total` climbs
220+
- `INGEST_MIN_SEVERITY` (`INFO`), `STORE_MIN_SEVERITY` (`""` = same as ingest) — two-tier log severity gate. The ingest gate runs at the OTLP receiver and **drops the log entirely** below the threshold (no in-memory enrichment either). The store gate runs at the persist boundary inside the async pipeline (`internal/ingest/pipeline.go:process`) and **only skips the DB row write** — the log still flows through `LogCallback` so vectordb indexing, GraphRAG Drain template mining, and span/trace correlation see it. Use case: `INGEST_MIN_SEVERITY=DEBUG STORE_MIN_SEVERITY=WARN` keeps SQLite small while letting in-memory anomaly detection benefit from the verbose stream. Setting `STORE_MIN_SEVERITY``INGEST_MIN_SEVERITY` is a no-op (logged as a warning at startup). Drops surface via `Pipeline.Stats().StoreFiltered`.
220221
- `INGEST_ASYNC_ENABLED` (true), `INGEST_PIPELINE_QUEUE_SIZE` (50000), `INGEST_PIPELINE_WORKERS` (8) — async ingest pipeline (`internal/ingest/pipeline.go`). Hybrid backpressure: <90% accept all, 90–100% drop healthy batches (errors/slow always pass), 100% return gRPC `RESOURCE_EXHAUSTED`. Set `INGEST_ASYNC_ENABLED=false` to revert to synchronous DB writes inside `Export()`. Drops surface as `otelcontext_ingest_pipeline_dropped_total{signal,reason}`.
221222
- `GRPC_MAX_RECV_MB` (16), `GRPC_MAX_CONCURRENT_STREAMS` (1000) — OTLP gRPC server caps, validated to 1..256 and 1..1_000_000
222223
- `RETENTION_BATCH_SIZE` (50000), `RETENTION_BATCH_SLEEP_MS` (1) — purge pacing; raise the sleep on busy production DBs

internal/config/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ type Config struct {
2525
IngestAllowedServices string
2626
IngestExcludedServices string
2727

28+
// Storage Filtering. Logs that pass IngestMinSeverity (so they reach the
29+
// receiver and feed in-memory consumers like vectordb / GraphRAG) but
30+
// fall below StoreMinSeverity are skipped during the DB persist pass —
31+
// only the row-write is dropped, not the in-memory enrichment. Empty
32+
// (default) means StoreMinSeverity == IngestMinSeverity, i.e. no
33+
// behavior change vs. the single-threshold semantics.
34+
StoreMinSeverity string
35+
2836
// DB Connection Pool
2937
DBMaxOpenConns int
3038
DBMaxIdleConns int
@@ -244,6 +252,7 @@ func Load(customPath string) (*Config, error) {
244252
DLQReplayInterval: getEnv("DLQ_REPLAY_INTERVAL", "5m"),
245253

246254
IngestMinSeverity: getEnv("INGEST_MIN_SEVERITY", "INFO"),
255+
StoreMinSeverity: getEnv("STORE_MIN_SEVERITY", ""),
247256
IngestAllowedServices: getEnv("INGEST_ALLOWED_SERVICES", ""),
248257
IngestExcludedServices: getEnv("INGEST_EXCLUDED_SERVICES", ""),
249258

internal/ingest/otlp.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,11 @@ func getServiceName(attrs []*commonpb.KeyValue) string {
682682
return "unknown-service"
683683
}
684684

685+
// ParseSeverity is the exported wrapper for parseSeverity. Used by main.go
686+
// to translate the STORE_MIN_SEVERITY env value into the integer rank the
687+
// pipeline's second-tier filter expects.
688+
func ParseSeverity(level string) int { return parseSeverity(level) }
689+
685690
// Filtering Helpers
686691
func parseSeverity(level string) int {
687692
switch strings.ToUpper(level) {

internal/ingest/pipeline.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,15 @@ type Pipeline struct {
152152
tenantMu sync.Mutex
153153
tenantInFlight map[string]int
154154

155+
// storeMinSeverity is the second-tier severity gate applied at persist
156+
// time inside process(). Logs in a Batch with severity below this
157+
// threshold are dropped from the BatchCreateAll write but still feed
158+
// the LogCallback (so vectordb / GraphRAG / Drain mining still see
159+
// them). 0 disables the second tier — every log that survived
160+
// IngestMinSeverity at the receiver is also persisted.
161+
storeMinSeverity int
162+
storeFiltered atomic.Int64
163+
155164
stopCh chan struct{}
156165
once sync.Once
157166
wg sync.WaitGroup
@@ -225,6 +234,22 @@ func (p *Pipeline) SetPerTenantCap(n int) {
225234
p.perTenantCap = n
226235
}
227236

237+
// SetStoreMinSeverity configures the second-tier severity gate applied at
238+
// persist time. Logs below `level` are dropped from the BatchCreateAll write
239+
// but still flow through the LogCallback so in-memory consumers (vectordb,
240+
// GraphRAG Drain mining, anomaly correlation) keep working. 0 disables the
241+
// second tier — every log surviving IngestMinSeverity at the receiver is
242+
// also persisted (legacy behavior).
243+
//
244+
// `level` is the integer rank from parseSeverity ("DEBUG"=10 .. "FATAL"=50).
245+
// Startup-only — call before Start().
246+
func (p *Pipeline) SetStoreMinSeverity(level int) {
247+
if level < 0 {
248+
level = 0
249+
}
250+
p.storeMinSeverity = level
251+
}
252+
228253
// TenantDropped reports the cumulative number of healthy submissions
229254
// rejected because the submitting tenant was at the per-tenant cap.
230255
// Distinct from RejectedFull (queue at hard capacity) and
@@ -353,6 +378,7 @@ func (p *Pipeline) Stats() PipelineStats {
353378
DroppedHealthy: p.droppedHealthy.Load(),
354379
RejectedFull: p.rejectedFull.Load(),
355380
ProcessFailures: p.processFailures.Load(),
381+
StoreFiltered: p.storeFiltered.Load(),
356382
QueueDepth: len(p.queue),
357383
Capacity: p.cfg.Capacity,
358384
}
@@ -365,6 +391,7 @@ type PipelineStats struct {
365391
DroppedHealthy int64
366392
RejectedFull int64
367393
ProcessFailures int64
394+
StoreFiltered int64 // logs dropped by STORE_MIN_SEVERITY at persist time
368395
QueueDepth int
369396
Capacity int
370397
}
@@ -434,15 +461,33 @@ func (p *Pipeline) process(b *Batch) {
434461
return
435462
}
436463

437-
if err := p.writer.BatchCreateAll(b.Traces, b.Spans, b.Logs); err != nil {
464+
// Apply the second-tier store-severity gate. Logs below the threshold
465+
// are dropped from the persist set but still flow through the callback
466+
// so in-memory enrichers (vectordb, GraphRAG Drain) keep seeing them.
467+
logsToPersist := b.Logs
468+
if p.storeMinSeverity > 0 && len(b.Logs) > 0 {
469+
kept := make([]storage.Log, 0, len(b.Logs))
470+
for _, l := range b.Logs {
471+
if shouldIngestSeverity(l.Severity, p.storeMinSeverity) {
472+
kept = append(kept, l)
473+
} else {
474+
p.storeFiltered.Add(1)
475+
}
476+
}
477+
logsToPersist = kept
478+
}
479+
480+
if err := p.writer.BatchCreateAll(b.Traces, b.Spans, logsToPersist); err != nil {
438481
slog.Error("ingest pipeline: BatchCreateAll failed", "error", err)
439482
p.processFailures.Add(1)
440483
return
441484
}
442485

443486
// Callbacks fire only after the transaction commits successfully — a
444487
// rolled-back batch must not feed downstream consumers (GraphRAG etc.)
445-
// data that no longer exists in the DB.
488+
// data that no longer exists in the DB. The LogCallback intentionally
489+
// iterates over the FULL b.Logs slice, not logsToPersist — even logs
490+
// dropped by the store-severity gate must reach in-memory enrichers.
446491
if b.SpanCallback != nil {
447492
for _, s := range b.Spans {
448493
b.SpanCallback(s)

internal/ingest/pipeline_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,3 +547,127 @@ func TestPipeline_PanicInCallbackRecovered(t *testing.T) {
547547
t.Errorf("expected ProcessFailures > 0 after callback panic")
548548
}
549549
}
550+
551+
// TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist verifies that
552+
// when SetStoreMinSeverity is configured, logs below the threshold are
553+
// dropped from BatchCreateAll — but the LogCallback still fires for them
554+
// so in-memory enrichers (vectordb, GraphRAG) keep seeing every log that
555+
// passed IngestMinSeverity at the receiver.
556+
func TestPipeline_StoreMinSeverity_DropsBelowThresholdFromPersist(t *testing.T) {
557+
t.Parallel()
558+
w := &fakeWriter{}
559+
p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9})
560+
// Threshold = WARN (rank 30); INFO (20) is below, ERROR (40) is above.
561+
p.SetStoreMinSeverity(ParseSeverity("WARN"))
562+
p.Start(context.Background())
563+
defer p.Stop()
564+
565+
var callbackSeen []string
566+
var cbMu sync.Mutex
567+
cb := func(l storage.Log) {
568+
cbMu.Lock()
569+
defer cbMu.Unlock()
570+
callbackSeen = append(callbackSeen, l.Severity)
571+
}
572+
573+
b := &Batch{
574+
Type: SignalLogs,
575+
Tenant: "t1",
576+
Logs: []storage.Log{
577+
{Body: "info-row", Severity: "INFO"},
578+
{Body: "warn-row", Severity: "WARN"},
579+
{Body: "err-row", Severity: "ERROR"},
580+
},
581+
LogCallback: cb,
582+
}
583+
if err := p.Submit(b); err != nil {
584+
t.Fatalf("submit: %v", err)
585+
}
586+
587+
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) {
588+
t.Fatalf("batch never processed")
589+
}
590+
591+
// Persist: only WARN + ERROR should reach the writer.
592+
w.mu.Lock()
593+
persistedCount := 0
594+
persistedSeverities := []string{}
595+
for _, call := range w.logsCalls {
596+
for _, l := range call {
597+
persistedCount++
598+
persistedSeverities = append(persistedSeverities, l.Severity)
599+
}
600+
}
601+
w.mu.Unlock()
602+
if persistedCount != 2 {
603+
t.Fatalf("expected 2 logs persisted (WARN+ERROR), got %d: %v", persistedCount, persistedSeverities)
604+
}
605+
for _, sev := range persistedSeverities {
606+
if sev == "INFO" {
607+
t.Errorf("INFO log was persisted but should have been gated by store-min-severity")
608+
}
609+
}
610+
611+
// Callback: must fire for ALL THREE logs (INFO included), since the
612+
// in-memory enrichment path is independent of the persist gate.
613+
cbMu.Lock()
614+
defer cbMu.Unlock()
615+
if len(callbackSeen) != 3 {
616+
t.Fatalf("expected LogCallback to fire 3 times (incl. gated INFO), got %d: %v", len(callbackSeen), callbackSeen)
617+
}
618+
infoCb := false
619+
for _, sev := range callbackSeen {
620+
if sev == "INFO" {
621+
infoCb = true
622+
}
623+
}
624+
if !infoCb {
625+
t.Errorf("INFO log did not reach LogCallback — in-memory enrichment path broken: %v", callbackSeen)
626+
}
627+
628+
// Stats: storeFiltered should report exactly 1 (the INFO drop).
629+
if got := p.Stats().StoreFiltered; got != 1 {
630+
t.Errorf("Stats().StoreFiltered = %d, want 1", got)
631+
}
632+
}
633+
634+
// TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs verifies the legacy
635+
// path: when SetStoreMinSeverity is NOT called (or set to 0), every log in a
636+
// Batch is persisted regardless of severity.
637+
func TestPipeline_StoreMinSeverity_Disabled_PersistsAllLogs(t *testing.T) {
638+
t.Parallel()
639+
w := &fakeWriter{}
640+
p := NewPipeline(w, nil, PipelineConfig{Capacity: 10, Workers: 1, SoftThreshold: 0.9})
641+
// No SetStoreMinSeverity call → gate disabled.
642+
p.Start(context.Background())
643+
defer p.Stop()
644+
645+
b := &Batch{
646+
Type: SignalLogs,
647+
Tenant: "t1",
648+
Logs: []storage.Log{
649+
{Body: "info-row", Severity: "INFO"},
650+
{Body: "debug-row", Severity: "DEBUG"},
651+
{Body: "err-row", Severity: "ERROR"},
652+
},
653+
}
654+
if err := p.Submit(b); err != nil {
655+
t.Fatalf("submit: %v", err)
656+
}
657+
if !waitFor(t, 5*time.Second, func() bool { return p.Stats().Processed >= 1 }) {
658+
t.Fatalf("batch never processed")
659+
}
660+
661+
w.mu.Lock()
662+
defer w.mu.Unlock()
663+
total := 0
664+
for _, call := range w.logsCalls {
665+
total += len(call)
666+
}
667+
if total != 3 {
668+
t.Fatalf("expected all 3 logs persisted with gate disabled, got %d", total)
669+
}
670+
if got := p.Stats().StoreFiltered; got != 0 {
671+
t.Errorf("Stats().StoreFiltered = %d, want 0 with gate disabled", got)
672+
}
673+
}

main.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,32 @@ func main() {
505505
Workers: cfg.IngestPipelineWorkers,
506506
})
507507
ingestPipeline.SetPerTenantCap(cfg.IngestPipelinePerTenantCap)
508+
509+
// Second-tier severity gate. Empty STORE_MIN_SEVERITY means "use the
510+
// same threshold as INGEST_MIN_SEVERITY" — i.e. behavior is identical
511+
// to the legacy single-threshold path. Only enable the gate when the
512+
// store threshold is strictly higher than the ingest threshold; equal
513+
// or lower is wasted work since the receiver has already dropped the
514+
// affected logs.
515+
ingestRank := ingest.ParseSeverity(cfg.IngestMinSeverity)
516+
storeRank := ingestRank
517+
if cfg.StoreMinSeverity != "" {
518+
storeRank = ingest.ParseSeverity(cfg.StoreMinSeverity)
519+
}
520+
if storeRank > ingestRank {
521+
ingestPipeline.SetStoreMinSeverity(storeRank)
522+
slog.Info("🪛 Store-severity gate enabled",
523+
"ingest_min", cfg.IngestMinSeverity,
524+
"store_min", cfg.StoreMinSeverity,
525+
"note", "logs below store_min reach in-memory consumers but are not persisted",
526+
)
527+
} else if cfg.StoreMinSeverity != "" && storeRank < ingestRank {
528+
slog.Warn("STORE_MIN_SEVERITY is lower than INGEST_MIN_SEVERITY — has no effect; receiver already filters",
529+
"ingest_min", cfg.IngestMinSeverity,
530+
"store_min", cfg.StoreMinSeverity,
531+
)
532+
}
533+
508534
ingestPipeline.Start(context.Background())
509535
traceServer.SetPipeline(ingestPipeline)
510536
logsServer.SetPipeline(ingestPipeline)

0 commit comments

Comments
 (0)