diff --git a/CLAUDE.md b/CLAUDE.md index a410516..a49b649 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -202,6 +202,7 @@ docs/ # Specifications and plans Key settings in `internal/config/config.go`: - `HTTP_PORT` (8080), `GRPC_PORT` (4317), `DB_DRIVER` (sqlite), `DB_DSN` - `DB_AUTOMIGRATE` (true), `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME` (internally capped to 30m when `DB_AZURE_AUTH=true`) +- `DB_SQLITE_READ_POOL_SIZE` (4) — only effective on `DB_DRIVER=sqlite`. Opens a second `*gorm.DB` against the same DB file with `MaxOpen=N` and `PRAGMA query_only=ON`, used by `Repository.reader()` for all read-only queries (`Get*`, `Recent*`, `Search*`, dashboard/metrics/heatmap, GraphRAG span loader, FTS5 search). The writer pool keeps `MaxOpen=1` (SQLite's single-writer model), so reads no longer queue behind in-flight writes/retention/`VACUUM`. Range `[1, 32]`; values `0`, `off`, `false`, `no` disable the pool and reads fall back to the writer. Non-SQLite drivers ignore this setting. - `DB_AZURE_AUTH` (false) — see Authentication below - `TLS_CERT_FILE`, `TLS_KEY_FILE` — explicit TLS (both or neither) - `TLS_AUTO_SELFSIGNED` (false), `TLS_CACHE_DIR` (`./data/tls`) — self-signed bootstrap, ignored if cert files set diff --git a/internal/storage/archive_repo.go b/internal/storage/archive_repo.go index 6d3490a..d7b19bf 100644 --- a/internal/storage/archive_repo.go +++ b/internal/storage/archive_repo.go @@ -3,21 +3,22 @@ package storage // HotDBSizeBytes returns an approximate size of the hot DB in bytes. // For SQLite this reads the file size. For others it queries pg_database_size / information_schema. func (r *Repository) HotDBSizeBytes() int64 { + rdr := r.reader() switch r.driver { case "sqlite", "": var pageCount, pageSize int64 - r.db.Raw("PRAGMA page_count").Scan(&pageCount) - r.db.Raw("PRAGMA page_size").Scan(&pageSize) + rdr.Raw("PRAGMA page_count").Scan(&pageCount) + rdr.Raw("PRAGMA page_size").Scan(&pageSize) return pageCount * pageSize case "postgres", "postgresql": var size int64 - r.db.Raw("SELECT pg_database_size(current_database())").Scan(&size) + rdr.Raw("SELECT pg_database_size(current_database())").Scan(&size) return size case "mysql": var size int64 - r.db.Raw(`SELECT SUM(data_length + index_length) FROM information_schema.tables + rdr.Raw(`SELECT SUM(data_length + index_length) FROM information_schema.tables WHERE table_schema = DATABASE()`).Scan(&size) return size diff --git a/internal/storage/factory.go b/internal/storage/factory.go index 8ba94a9..b786ca4 100644 --- a/internal/storage/factory.go +++ b/internal/storage/factory.go @@ -137,6 +137,58 @@ func NewDatabase(driver, dsn string) (*gorm.DB, error) { return db, nil } +// NewSQLiteReadPool opens a SECOND GORM handle against the same SQLite file, +// configured with N connections in the pool. Used to alleviate the +// MaxOpen=1 single-writer pool's read/write contention: API/MCP read +// handlers route through this pool, while the writer keeps its +// single-connection serialized pool. WAL mode (already enabled on the +// writer pool) gives each reader connection its own snapshot, so they +// don't block on the writer or on each other. +// +// Caveats: +// - Caller must ensure NewDatabase has already been called against the +// same DSN so WAL/journal-mode/synchronous pragmas are in place. The +// read pool inherits those at the file level. +// - maxOpen is clamped to [1, 32]. A non-positive maxOpen passed to this +// function returns a pool of size 4 (default). Note: the production +// caller (NewRepository) short-circuits size == 0 to mean "read pool +// disabled" and does not invoke this factory in that case — so the +// "0 → 4" path here is only reachable from tests / direct callers. +// - Driver string must already be normalised to "sqlite". Other drivers +// should not be opened through here — they have their own pool config. +func NewSQLiteReadPool(dsn string, maxOpen int) (*gorm.DB, error) { + if dsn == "" { + dsn = "OtelContext.db" + } + if maxOpen <= 0 { + maxOpen = 4 + } + if maxOpen > 32 { + maxOpen = 32 + } + db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Error), + DisableForeignKeyConstraintWhenMigrating: true, + }) + if err != nil { + return nil, fmt.Errorf("sqlite read-pool open: %s", scrubDSN(err.Error())) + } + // Defensive — WAL was already set on the writer; idempotent no-op here + // but harmless if the read-pool happens to be opened first. + db.Exec("PRAGMA journal_mode=WAL") + db.Exec("PRAGMA busy_timeout=5000") + db.Exec("PRAGMA query_only=ON") + sqlDB, err := db.DB() + if err != nil { + return nil, fmt.Errorf("sqlite read-pool: get *sql.DB: %w", err) + } + sqlDB.SetMaxOpenConns(maxOpen) + sqlDB.SetMaxIdleConns(maxOpen) + sqlDB.SetConnMaxLifetime(time.Hour) + log.Printf("📖 SQLite read-pool enabled: MaxOpen=%d, query_only=ON", maxOpen) + return db, nil +} + func getEnvPoolInt(key string, fallback int) int { if v, ok := os.LookupEnv(key); ok { if i, err := strconv.Atoi(v); err == nil { diff --git a/internal/storage/graph_repo.go b/internal/storage/graph_repo.go index a88df36..0860101 100644 --- a/internal/storage/graph_repo.go +++ b/internal/storage/graph_repo.go @@ -32,7 +32,7 @@ func (r *Repository) GetSpansForGraph(since time.Time) ([]SpanGraphRow, error) { } var rows []raw - err := r.db. + err := r.reader(). Table("spans"). Select("spans.span_id, spans.parent_span_id, spans.service_name, spans.operation_name, spans.duration, traces.status AS trace_status, spans.start_time"). Joins("LEFT JOIN traces ON traces.trace_id = spans.trace_id"). diff --git a/internal/storage/log_repo.go b/internal/storage/log_repo.go index e26c2ad..40a88ce 100644 --- a/internal/storage/log_repo.go +++ b/internal/storage/log_repo.go @@ -44,7 +44,7 @@ func (r *Repository) BatchCreateLogs(logs []Log) error { func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error) { tenant := TenantFromContext(ctx) var l Log - if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).First(&l, id).Error; err != nil { + if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).First(&l, id).Error; err != nil { return nil, fmt.Errorf("failed to get log: %w", err) } return &l, nil @@ -54,7 +54,7 @@ func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error) { func (r *Repository) GetRecentLogs(ctx context.Context, limit int) ([]Log, error) { tenant := TenantFromContext(ctx) var logs []Log - if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil { + if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil { return nil, fmt.Errorf("failed to get recent logs: %w", err) } return logs, nil @@ -81,7 +81,7 @@ func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, in } } - base := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant) + base := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant) if useFTS5 { base = base.Joins("JOIN "+fts5LogsTable+" ON logs.id = "+fts5LogsTable+".rowid"). Where(fts5LogsTable+" MATCH ?", matchExpr) @@ -153,7 +153,7 @@ func applyLogFilterCriteria(base *gorm.DB, filter LogFilter) *gorm.DB { func (r *Repository) getLogsV2LikeFallback(ctx context.Context, filter LogFilter, tenant string) ([]Log, int64, error) { var logs []Log var total int64 - base := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant) + base := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant) base = applyLogFilterCriteria(base, filter) if filter.Search != "" { search := "%" + filter.Search + "%" @@ -180,7 +180,7 @@ func (r *Repository) GetLogContext(ctx context.Context, targetTime time.Time) ([ end := targetTime.Add(1 * time.Minute) var logs []Log - if err := r.db.WithContext(ctx).Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end). + if err := r.reader().WithContext(ctx).Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end). Order("timestamp asc"). Find(&logs).Error; err != nil { return nil, fmt.Errorf("failed to fetch log context: %w", err) @@ -223,7 +223,7 @@ func (r *Repository) LogsForVectorReplay(ctx context.Context, sinceID uint, limi limit = 10_000 } var logs []Log - err := r.db.WithContext(ctx). + err := r.reader().WithContext(ctx). Where("id > ? AND severity IN ?", sinceID, []string{"ERROR", "WARN", "WARNING", "FATAL", "CRITICAL"}). Order("id ASC"). Limit(limit). @@ -244,7 +244,7 @@ func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, s if limit <= 0 { limit = 5000 } - q := r.db.WithContext(ctx).Model(&Log{}) + q := r.reader().WithContext(ctx).Model(&Log{}) if severity != "" { q = q.Where(sqlWhereSeverity, severity) } diff --git a/internal/storage/metrics_repo.go b/internal/storage/metrics_repo.go index fd8e2e9..8a611c9 100644 --- a/internal/storage/metrics_repo.go +++ b/internal/storage/metrics_repo.go @@ -68,7 +68,7 @@ func (r *Repository) BatchCreateMetrics(buckets []MetricBucket) error { func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, serviceName string, metricName string) ([]MetricBucket, error) { tenant := TenantFromContext(ctx) var buckets []MetricBucket - query := r.db.WithContext(ctx).Where("tenant_id = ? AND time_bucket BETWEEN ? AND ?", tenant, start, end) + query := r.reader().WithContext(ctx).Where("tenant_id = ? AND time_bucket BETWEEN ? AND ?", tenant, start, end) if serviceName != "" { query = query.Where("service_name = ?", serviceName) } @@ -86,7 +86,7 @@ func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, func (r *Repository) GetMetricNames(ctx context.Context, serviceName string) ([]string, error) { tenant := TenantFromContext(ctx) var names []string - query := r.db.WithContext(ctx).Model(&MetricBucket{}).Where("tenant_id = ?", tenant) + query := r.reader().WithContext(ctx).Model(&MetricBucket{}).Where("tenant_id = ?", tenant) if serviceName != "" { query = query.Where("service_name = ?", serviceName) } @@ -182,7 +182,7 @@ func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time tenant := TenantFromContext(ctx) var stats DashboardStats - baseQuery := r.db.WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantTimeBetween, tenant, start, end) + baseQuery := r.reader().WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantTimeBetween, tenant, start, end) if len(serviceNames) > 0 { baseQuery = baseQuery.Where(sqlWhereServiceIn, serviceNames) } @@ -193,7 +193,7 @@ func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time } // 2. Total Logs - logQuery := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantTimeBetween, tenant, start, end) + logQuery := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantTimeBetween, tenant, start, end) if len(serviceNames) > 0 { logQuery = logQuery.Where(sqlWhereServiceIn, serviceNames) } @@ -285,7 +285,7 @@ func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time } var rows []traceRow - query := r.db.WithContext(ctx).Model(&Trace{}). + query := r.reader().WithContext(ctx).Model(&Trace{}). Select("timestamp, status"). Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end) @@ -335,7 +335,7 @@ func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time func (r *Repository) GetLatencyHeatmap(ctx context.Context, start, end time.Time, serviceNames []string) ([]LatencyPoint, error) { tenant := TenantFromContext(ctx) var points []LatencyPoint - query := r.db.WithContext(ctx).Model(&Trace{}). + query := r.reader().WithContext(ctx).Model(&Trace{}). Select("timestamp, duration"). Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end) @@ -393,7 +393,7 @@ func (r *Repository) PurgeMetricBucketsBatched(ctx context.Context, olderThan ti func (r *Repository) GetServices(ctx context.Context) ([]string, error) { tenant := TenantFromContext(ctx) var services []string - if err := r.db.WithContext(ctx).Model(&Trace{}). + if err := r.reader().WithContext(ctx).Model(&Trace{}). Where("tenant_id = ?", tenant). Distinct("service_name"). Order("service_name ASC"). diff --git a/internal/storage/repository.go b/internal/storage/repository.go index bd886b6..74b32b6 100644 --- a/internal/storage/repository.go +++ b/internal/storage/repository.go @@ -99,6 +99,14 @@ type Repository struct { driver string metrics *telemetry.Metrics + // readPool is an optional second GORM handle used for read-only queries + // on SQLite. The writer (`db`) keeps MaxOpen=1 to honour SQLite's + // single-writer model; without a separate pool, every API/MCP read + // serializes behind in-flight writes. WAL mode gives each readPool + // connection its own snapshot. Nil for non-SQLite drivers and when the + // read pool is explicitly disabled (DB_SQLITE_READ_POOL_SIZE=0). + readPool *gorm.DB + // logsPartitioned is set to true when DB_POSTGRES_PARTITIONING=daily is // active and the `logs` parent has been provisioned as a partitioned // table. RetentionScheduler reads this to skip the logs DELETE — the @@ -121,6 +129,26 @@ func (r *Repository) LogsPartitioned() bool { return r.logsPartitioned.Load() } // setup path (factory.go) once the partitioned schema is in place. func (r *Repository) MarkLogsPartitioned() { r.logsPartitioned.Store(true) } +// reader returns the connection to use for read-only queries. Falls back to +// the writer pool when no read pool is configured (non-SQLite drivers, or +// when the read pool was explicitly disabled). +// +// Read methods that route through here are non-blocking against in-flight +// writes on SQLite; methods that still use r.db serialize through the +// MaxOpen=1 writer pool, which is correct for transactional / write paths. +// +// WAL snapshot caveat: each non-transactional statement on a read-pool +// connection sees the latest committed WAL frames. Do NOT wrap multiple +// reader().… calls in a long-lived read transaction (Begin / Session with +// PrepareStmt) — that pins a snapshot taken at the first statement and +// will hide subsequent writer commits for the lifetime of the txn. +func (r *Repository) reader() *gorm.DB { + if r.readPool != nil { + return r.readPool + } + return r.db +} + // NewRepository initializes the database connection using environment variables and migrates the schema. func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { driver := os.Getenv("DB_DRIVER") @@ -174,6 +202,25 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { } repo := &Repository{db: db, driver: driver, metrics: metrics} + + // On SQLite, open a separate read pool so API/MCP reads don't serialize + // behind writes through the MaxOpen=1 writer pool. Disabled when + // DB_SQLITE_READ_POOL_SIZE=0; otherwise size is clamped to [1,32] + // (default 4) inside NewSQLiteReadPool. + if driver == "sqlite" { + size := sqliteReadPoolSizeFromEnv() + if size > 0 { + rp, err := NewSQLiteReadPool(dsn, size) + if err != nil { + slog.Warn("SQLite read-pool unavailable, falling back to writer pool for reads", + "error", err, + ) + } else { + repo.readPool = rp + } + } + } + // Detect partitioned-logs mode from the live schema so the // RetentionScheduler can skip the row-level DELETE path. We do this from // the DB rather than passing the config flag through several layers, @@ -188,13 +235,33 @@ func NewRepository(metrics *telemetry.Metrics) (*Repository, error) { return repo, nil } +// sqliteReadPoolSizeFromEnv reads DB_SQLITE_READ_POOL_SIZE. Returns: +// - 0 when the env var is set to "0" or "off" (read pool disabled) +// - the parsed integer when set to a positive number +// - 4 (default) when unset or unparseable +func sqliteReadPoolSizeFromEnv() int { + v, ok := os.LookupEnv("DB_SQLITE_READ_POOL_SIZE") + if !ok { + return 4 + } + s := strings.TrimSpace(strings.ToLower(v)) + if s == "off" || s == "false" || s == "no" { + return 0 + } + n, err := strconv.Atoi(s) + if err != nil || n < 0 { + return 4 + } + return n +} + // Stats aggregation and DB management // GetStats returns high-level database stats scoped to the tenant carried on ctx. // Unscoped aggregates (DB size, etc.) are not tenant-specific and are reported as-is. func (r *Repository) GetStats(ctx context.Context) (map[string]any, error) { tenant := TenantFromContext(ctx) - db := r.db.WithContext(ctx) + db := r.reader().WithContext(ctx) var traceCount int64 var logCount int64 @@ -233,8 +300,8 @@ func (r *Repository) GetStats(ctx context.Context) (map[string]any, error) { var dbSizeMB float64 if r.driver == "sqlite" { var pageCount, pageSize int64 - r.db.Raw("PRAGMA page_count").Scan(&pageCount) - r.db.Raw("PRAGMA page_size").Scan(&pageSize) + r.reader().Raw("PRAGMA page_count").Scan(&pageCount) + r.reader().Raw("PRAGMA page_size").Scan(&pageSize) dbSizeMB = float64(pageCount*pageSize) / (1024 * 1024) } @@ -263,8 +330,21 @@ func (r *Repository) VacuumDB() error { return nil } -// Close closes the underlying database connection. +// Close closes the underlying database connection. When a SQLite read pool was +// opened, it is closed first; the writer pool is closed last so any in-flight +// reads see the writer's WAL state through the close sequence. func (r *Repository) Close() error { + if r.readPool != nil { + rp, err := r.readPool.DB() + switch { + case err != nil: + slog.Warn("read pool: get underlying sql.DB failed during close", "error", err) + default: + if cerr := rp.Close(); cerr != nil { + slog.Warn("read pool: close failed", "error", cerr) + } + } + } sqlDB, err := r.db.DB() if err != nil { return fmt.Errorf("failed to get underlying sql.DB: %w", err) @@ -279,6 +359,11 @@ func (r *Repository) DB() *gorm.DB { // NewRepositoryFromDB constructs a Repository from an existing *gorm.DB. // Intended for tests and advanced wiring — production code should use NewRepository. +// +// Note: the SQLite read-pool optimization (DB_SQLITE_READ_POOL_SIZE) is +// bypassed here. Tests that hit this constructor see the legacy single-pool +// (MaxOpen=1) behavior — fine for correctness, but contention scenarios +// must use NewRepository to exercise the read pool. func NewRepositoryFromDB(db *gorm.DB, driver string) *Repository { if driver == "" { driver = "sqlite" @@ -290,7 +375,7 @@ func NewRepositoryFromDB(db *gorm.DB, driver string) *Repository { func (r *Repository) RecentTraces(ctx context.Context, limit int) ([]Trace, error) { tenant := TenantFromContext(ctx) var traces []Trace - if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&traces).Error; err != nil { + if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&traces).Error; err != nil { return nil, err } return traces, nil @@ -300,7 +385,7 @@ func (r *Repository) RecentTraces(ctx context.Context, limit int) ([]Trace, erro func (r *Repository) RecentLogs(ctx context.Context, limit int) ([]Log, error) { tenant := TenantFromContext(ctx) var logs []Log - if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil { + if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil { return nil, err } return logs, nil @@ -318,7 +403,7 @@ func (r *Repository) SearchLogs(ctx context.Context, query string, limit int) ([ return r.searchLogsFTS5(ctx, tenant, query, limit) } var logs []Log - db := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit) + db := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit) if query != "" { op := r.likeOp() db = db.Where(fmt.Sprintf("body %s ? OR service_name %s ?", op, op), "%"+query+"%", "%"+query+"%") @@ -337,11 +422,11 @@ func (r *Repository) searchLogsFTS5(ctx context.Context, tenant, query string, l matchExpr := fts5MatchExpr(query) if matchExpr == "" { var logs []Log - err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error + err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error return logs, err } var logs []Log - err := r.db.WithContext(ctx). + err := r.reader().WithContext(ctx). Table("logs"). Joins("JOIN "+fts5LogsTable+" ON logs.id = "+fts5LogsTable+".rowid"). Where("logs.tenant_id = ? AND "+fts5LogsTable+" MATCH ?", tenant, matchExpr). @@ -365,7 +450,7 @@ func (r *Repository) searchLogsFTS5(ctx context.Context, tenant, query string, l func (r *Repository) searchLogsLikeFallback(ctx context.Context, tenant, query string, limit int) ([]Log, error) { var logs []Log op := r.likeOp() - err := r.db.WithContext(ctx). + err := r.reader().WithContext(ctx). Where("tenant_id = ?", tenant). Where(fmt.Sprintf("body %s ? OR service_name %s ?", op, op), "%"+query+"%", "%"+query+"%"). Order(sqlOrderTimestampDesc). diff --git a/internal/storage/sqlite_read_pool_test.go b/internal/storage/sqlite_read_pool_test.go new file mode 100644 index 0000000..9e2eb60 --- /dev/null +++ b/internal/storage/sqlite_read_pool_test.go @@ -0,0 +1,216 @@ +package storage + +import ( + "context" + "os" + "path/filepath" + "slices" + "sync" + "testing" + "time" +) + +func TestSQLiteReadPoolSizeFromEnv(t *testing.T) { + cases := []struct { + name string + set bool + val string + want int + }{ + {name: "unset_default", set: false, want: 4}, + {name: "empty_string_falls_to_default", set: true, val: "", want: 4}, + {name: "zero_disables", set: true, val: "0", want: 0}, + {name: "off_disables", set: true, val: "off", want: 0}, + {name: "OFF_disables", set: true, val: "OFF", want: 0}, + {name: "false_disables", set: true, val: "false", want: 0}, + {name: "no_disables", set: true, val: "no", want: 0}, + {name: "positive_int", set: true, val: "12", want: 12}, + {name: "garbage_falls_to_default", set: true, val: "not-a-number", want: 4}, + {name: "negative_falls_to_default", set: true, val: "-5", want: 4}, + {name: "whitespace_trimmed", set: true, val: " 8 ", want: 8}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if tc.set { + t.Setenv("DB_SQLITE_READ_POOL_SIZE", tc.val) + } else { + // Truly unset — exercises the !ok branch in sqliteReadPoolSizeFromEnv. + // t.Setenv with an existing parent value would set it; explicitly + // unset and restore via t.Cleanup so other sub-tests are unaffected. + prev, hadPrev := os.LookupEnv("DB_SQLITE_READ_POOL_SIZE") + os.Unsetenv("DB_SQLITE_READ_POOL_SIZE") + t.Cleanup(func() { + if hadPrev { + os.Setenv("DB_SQLITE_READ_POOL_SIZE", prev) + } else { + os.Unsetenv("DB_SQLITE_READ_POOL_SIZE") + } + }) + } + got := sqliteReadPoolSizeFromEnv() + if got != tc.want { + t.Fatalf("sqliteReadPoolSizeFromEnv(%q) = %d, want %d", tc.val, got, tc.want) + } + }) + } +} + +// TestNewSQLiteReadPool_QueryOnly verifies the read pool is provisioned with +// query_only=ON, so a buggy caller that routes a write through reader() gets +// a fast SQLite error rather than silently corrupting state. +func TestNewSQLiteReadPool_QueryOnly(t *testing.T) { + dir := t.TempDir() + dsn := filepath.Join(dir, "rp.db") + + // Writer pool first — that's the migration owner. + writer, err := NewDatabase("sqlite", dsn) + if err != nil { + t.Fatalf("NewDatabase writer: %v", err) + } + t.Cleanup(func() { _ = closeDB(writer) }) + if err := AutoMigrateModels(writer, "sqlite"); err != nil { + t.Fatalf("migrate: %v", err) + } + + rp, err := NewSQLiteReadPool(dsn, 4) + if err != nil { + t.Fatalf("NewSQLiteReadPool: %v", err) + } + t.Cleanup(func() { _ = closeDB(rp) }) + + // SELECT must succeed. + var count int64 + if err := rp.Model(&Log{}).Count(&count).Error; err != nil { + t.Fatalf("read-pool SELECT failed: %v", err) + } + + // INSERT must fail because query_only=ON. + err = rp.Create(&Log{Body: "should-fail", ServiceName: "svc", Severity: "INFO", Timestamp: time.Now()}).Error + if err == nil { + t.Fatal("expected query_only INSERT to fail; it succeeded") + } +} + +// TestRepository_ReaderFallback covers both branches of reader(): +// - readPool == nil → falls back to writer +// - readPool != nil → returns the read pool +func TestRepository_ReaderFallback(t *testing.T) { + dir := t.TempDir() + dsn := filepath.Join(dir, "fb.db") + + writer, err := NewDatabase("sqlite", dsn) + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + t.Cleanup(func() { _ = closeDB(writer) }) + if err := AutoMigrateModels(writer, "sqlite"); err != nil { + t.Fatalf("migrate: %v", err) + } + + repo := &Repository{db: writer, driver: "sqlite"} + if repo.reader() != writer { + t.Fatal("reader() should fall back to writer when readPool is nil") + } + + rp, err := NewSQLiteReadPool(dsn, 2) + if err != nil { + t.Fatalf("NewSQLiteReadPool: %v", err) + } + repo.readPool = rp + t.Cleanup(func() { _ = closeDB(rp) }) + + if repo.reader() != rp { + t.Fatal("reader() should return readPool when it is set") + } +} + +// TestSQLiteReadPool_ConcurrentReadsAndWrites runs N parallel reads through +// the read pool while a background goroutine pumps writes through the writer. +// All reads must succeed and the run must complete within a generous deadline, +// proving the writer's MaxOpen=1 connection is not on the read critical path. +func TestSQLiteReadPool_ConcurrentReadsAndWrites(t *testing.T) { + dir := t.TempDir() + dsn := filepath.Join(dir, "concurrent.db") + + writer, err := NewDatabase("sqlite", dsn) + if err != nil { + t.Fatalf("NewDatabase: %v", err) + } + t.Cleanup(func() { _ = closeDB(writer) }) + if err := AutoMigrateModels(writer, "sqlite"); err != nil { + t.Fatalf("migrate: %v", err) + } + + seedLogs(t, writer, 64, time.Now(), "svc-rp") + + rp, err := NewSQLiteReadPool(dsn, 4) + if err != nil { + t.Fatalf("NewSQLiteReadPool: %v", err) + } + t.Cleanup(func() { _ = closeDB(rp) }) + + repo := &Repository{db: writer, driver: "sqlite", readPool: rp} + + // Identity-based assertions: prove the read path is wired through rp, + // not silently routed back through the writer pool. + if repo.reader() == writer { + t.Fatal("repo.reader() returned writer pool — read pool not wired") + } + if repo.reader() != rp { + t.Fatal("repo.reader() did not return the configured read pool") + } + + stop := make(chan struct{}) + writerDone := make(chan struct{}) + go func() { + defer close(writerDone) + for { + select { + case <-stop: + return + default: + } + _ = writer.Create(&Log{ + TraceID: "t", SpanID: "s", Severity: "INFO", + Body: "writer-pump", ServiceName: "svc-rp", Timestamp: time.Now(), + }).Error + } + }() + + const readers = 8 + const readsEach = 20 + var wg sync.WaitGroup + wg.Add(readers) + var latMu sync.Mutex + lats := make([]time.Duration, 0, readers*readsEach) + for range readers { + go func() { + defer wg.Done() + for range readsEach { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + t0 := time.Now() + _, err := repo.RecentLogs(ctx, 8) + cancel() + if err != nil { + t.Errorf("RecentLogs concurrent read failed: %v", err) + return + } + latMu.Lock() + lats = append(lats, time.Since(t0)) + latMu.Unlock() + } + }() + } + wg.Wait() + close(stop) + <-writerDone + + // p99 latency check — well above the writer's MaxOpen=1 contention window + // but below what a serialized pool would produce when the writer is hot. + slices.Sort(lats) + p99 := lats[len(lats)*99/100] + t.Logf("concurrent read pool: n=%d p50=%s p99=%s max=%s", len(lats), lats[len(lats)/2], p99, lats[len(lats)-1]) + if p99 > 200*time.Millisecond { + t.Fatalf("p99 read latency %s exceeds 200ms — read pool likely serialized", p99) + } +} diff --git a/internal/storage/trace_repo.go b/internal/storage/trace_repo.go index f0ae992..3519a2d 100644 --- a/internal/storage/trace_repo.go +++ b/internal/storage/trace_repo.go @@ -146,7 +146,7 @@ func (r *Repository) CreateTrace(trace Trace) error { func (r *Repository) GetTrace(ctx context.Context, traceID string) (*Trace, error) { tenant := TenantFromContext(ctx) var trace Trace - if err := r.db.WithContext(ctx). + if err := r.reader().WithContext(ctx). Preload("Spans", sqlWhereTenantID, tenant). Preload("Logs", sqlWhereTenantID, tenant). Where("tenant_id = ? AND trace_id = ?", tenant, traceID). @@ -171,7 +171,7 @@ func (r *Repository) GetTracesFiltered(ctx context.Context, start, end time.Time var traces []Trace var total int64 - base := r.db.WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantID, tenant) + base := r.reader().WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantID, tenant) if !start.IsZero() && !end.IsZero() { base = base.Where("timestamp BETWEEN ? AND ?", start, end) @@ -225,7 +225,7 @@ func (r *Repository) GetTracesFiltered(ctx context.Context, start, end time.Time } var summaries []spanSummary - r.db.WithContext(ctx).Raw( + r.reader().WithContext(ctx).Raw( `SELECT trace_id, COUNT(*) as span_count, MIN(operation_name) as operation_name FROM spans WHERE tenant_id = ? AND trace_id IN ? GROUP BY trace_id`, tenant, traceIDs, ).Scan(&summaries) @@ -262,7 +262,7 @@ const serviceMapSpanLimit = 500_000 func (r *Repository) GetServiceMapMetrics(ctx context.Context, start, end time.Time) (*ServiceMapMetrics, error) { tenant := TenantFromContext(ctx) var spans []Span - query := r.db.WithContext(ctx).Model(&Span{}).Where(sqlWhereTenantID, tenant) + query := r.reader().WithContext(ctx).Model(&Span{}).Where(sqlWhereTenantID, tenant) if !start.IsZero() && !end.IsZero() { query = query.Where("start_time BETWEEN ? AND ?", start, end)