Skip to content

Commit 83fdbf7

Browse files
aksOpsclaude
andauthored
fix(storage): make span ingest idempotent for DLQ replay (#62)
Adds composite uniqueIndex idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id) so a duplicate span ingest — most commonly a DLQ replay after a partial-batch failure — collapses to a no-op rather than producing double-counted spans in the relational DB and downstream GraphRAG. Changes: - Span model: composite uniqueIndex on (tenant_id, trace_id, span_id) - BatchCreateSpans + BatchCreateAll spans path: OnConflict.DoNothing (MySQL takes INSERT IGNORE) — mirrors existing trace idempotency - New migrate_spans.go: dedupes pre-existing duplicate spans BEFORE AutoMigrate adds the unique index, so upgrades from pre-RAN-65 deployments don't fail on legacy duplicates - Test helper: distinct SpanIDs per seeded span so tests still create the expected count - 6 new tests covering: duplicate-insert no-op, cross-tenant key isolation, BatchCreateAll replay idempotency, dedupe migration on pre-existing dupes, no-op on fresh DB, no-op once unique index exists, AutoMigrate creates the unique index Logs remain non-idempotent (OTLP logs lack a stable identifier); called out explicitly in BatchCreateAll's doc comment as separate future work. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f86d391 commit 83fdbf7

6 files changed

Lines changed: 427 additions & 15 deletions

File tree

internal/storage/factory.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,15 @@ func AutoMigrateModelsWithOptions(db *gorm.DB, driver string, opts MigrateOption
238238
logsPartitioned = true
239239
}
240240

241+
// Dedupe spans BEFORE AutoMigrate adds the composite uniqueIndex
242+
// idx_spans_tenant_trace_span on (tenant_id, trace_id, span_id).
243+
// Pre-RAN-65 deployments may have duplicates from DLQ replays; the
244+
// unique index would fail to create against violating rows. No-op on
245+
// fresh databases or when the unique index already exists.
246+
if err := dedupeSpansForUniqueIndex(db, driver); err != nil {
247+
log.Printf("⚠️ span dedupe before unique index failed: %v", err)
248+
}
249+
241250
migrateModels := []any{&Trace{}, &Span{}, &MetricBucket{}}
242251
if !logsPartitioned {
243252
migrateModels = append(migrateModels, &Log{})

internal/storage/migrate_spans.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package storage
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"strings"
7+
8+
"gorm.io/gorm"
9+
)
10+
11+
// dedupeSpansForUniqueIndex collapses any pre-existing duplicate spans on
12+
// (tenant_id, trace_id, span_id) before AutoMigrate adds the composite
13+
// uniqueIndex. Without this, CREATE UNIQUE INDEX would fail on databases
14+
// that accumulated duplicates from earlier DLQ replays (or any pre-RAN-65
15+
// non-idempotent ingest path), aborting startup.
16+
//
17+
// Strategy across drivers: keep the lowest primary-key row per
18+
// (tenant_id, trace_id, span_id), delete the rest. Idempotent — a fresh
19+
// database (or one that never collected duplicates) is a no-op.
20+
//
21+
// Must run BEFORE db.AutoMigrate(...) in AutoMigrateModels; once the
22+
// unique constraint is in place, the dedupe is unnecessary because new
23+
// inserts collapse via OnConflict.DoNothing.
24+
func dedupeSpansForUniqueIndex(db *gorm.DB, driver string) error {
25+
if db == nil {
26+
return nil
27+
}
28+
driver = strings.ToLower(driver)
29+
30+
// Skip if the spans table doesn't exist yet — fresh databases have
31+
// nothing to dedupe and the upcoming AutoMigrate will create the
32+
// table with the uniqueIndex already in place.
33+
if !db.Migrator().HasTable("spans") {
34+
return nil
35+
}
36+
37+
// Skip if the unique index already exists (idempotent re-runs).
38+
if db.Migrator().HasIndex("spans", "idx_spans_tenant_trace_span") {
39+
return nil
40+
}
41+
42+
var deleted int64
43+
switch driver {
44+
case "sqlite", "":
45+
// SQLite supports DELETE with subquery on same table.
46+
res := db.Exec(`DELETE FROM spans WHERE id NOT IN (
47+
SELECT MIN(id) FROM spans GROUP BY tenant_id, trace_id, span_id
48+
)`)
49+
if res.Error != nil {
50+
return fmt.Errorf("dedupe spans (sqlite): %w", res.Error)
51+
}
52+
deleted = res.RowsAffected
53+
54+
case "postgres", "postgresql":
55+
// USING self-join keeps the lowest id per (tenant, trace, span).
56+
res := db.Exec(`DELETE FROM spans a USING spans b
57+
WHERE a.id > b.id
58+
AND a.tenant_id = b.tenant_id
59+
AND a.trace_id = b.trace_id
60+
AND a.span_id = b.span_id`)
61+
if res.Error != nil {
62+
return fmt.Errorf("dedupe spans (postgres): %w", res.Error)
63+
}
64+
deleted = res.RowsAffected
65+
66+
case "mysql":
67+
// MySQL forbids referencing the target table directly inside a
68+
// DELETE subquery; route through a temp table to keep the
69+
// "minimum id wins" semantics portable.
70+
if err := db.Exec(`CREATE TEMPORARY TABLE _spans_dedupe_keep AS
71+
SELECT MIN(id) AS id FROM spans GROUP BY tenant_id, trace_id, span_id`).Error; err != nil {
72+
return fmt.Errorf("dedupe spans (mysql temp table): %w", err)
73+
}
74+
defer db.Exec("DROP TEMPORARY TABLE IF EXISTS _spans_dedupe_keep")
75+
res := db.Exec(`DELETE FROM spans
76+
WHERE id NOT IN (SELECT id FROM _spans_dedupe_keep)`)
77+
if res.Error != nil {
78+
return fmt.Errorf("dedupe spans (mysql): %w", res.Error)
79+
}
80+
deleted = res.RowsAffected
81+
82+
case "sqlserver", "mssql":
83+
// T-SQL: ROW_NUMBER() over the dedupe key, then delete duplicates.
84+
res := db.Exec(`WITH dups AS (
85+
SELECT id, ROW_NUMBER() OVER (
86+
PARTITION BY tenant_id, trace_id, span_id ORDER BY id
87+
) AS rn FROM spans
88+
) DELETE FROM spans WHERE id IN (SELECT id FROM dups WHERE rn > 1)`)
89+
if res.Error != nil {
90+
return fmt.Errorf("dedupe spans (mssql): %w", res.Error)
91+
}
92+
deleted = res.RowsAffected
93+
94+
default:
95+
return nil
96+
}
97+
98+
if deleted > 0 {
99+
log.Printf("🧹 Deduplicated %d duplicate span row(s) before adding uniqueIndex idx_spans_tenant_trace_span", deleted)
100+
}
101+
return nil
102+
}

internal/storage/models.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,19 @@ type Trace struct {
110110
}
111111

112112
// Span represents a single operation within a trace.
113+
//
114+
// Idempotency: the composite uniqueIndex idx_spans_tenant_trace_span on
115+
// (tenant_id, trace_id, span_id) ensures a span is written at most once
116+
// per tenant. DLQ replay (or any duplicate ingest) collapses cleanly via
117+
// OnConflict.DoNothing in BatchCreateAll/BatchCreateSpans rather than
118+
// double-counting in downstream metrics or GraphRAG. The composite covers
119+
// the legacy idx_spans_tenant_trace as a left-prefix; the legacy index
120+
// is retained for query-plan stability across upgrades.
113121
type Span struct {
114122
ID uint `gorm:"primaryKey" json:"id"`
115-
TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1" json:"tenant_id"`
116-
TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2" json:"trace_id"`
117-
SpanID string `gorm:"size:16;not null" json:"span_id"`
123+
TenantID string `gorm:"size:64;default:'default';not null;index:idx_spans_tenant_trace,priority:1;index:idx_spans_tenant_service_start,priority:1;uniqueIndex:idx_spans_tenant_trace_span,priority:1" json:"tenant_id"`
124+
TraceID string `gorm:"size:32;not null;index:idx_spans_tenant_trace,priority:2;uniqueIndex:idx_spans_tenant_trace_span,priority:2" json:"trace_id"`
125+
SpanID string `gorm:"size:16;not null;uniqueIndex:idx_spans_tenant_trace_span,priority:3" json:"span_id"`
118126
ParentSpanID string `gorm:"size:16" json:"parent_span_id"`
119127
OperationName string `gorm:"size:255;index" json:"operation_name"`
120128
StartTime time.Time `gorm:"index:idx_spans_tenant_service_start,priority:3" json:"start_time"`

0 commit comments

Comments
 (0)