Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ docs/ # Specifications and plans
Key settings in `internal/config/config.go`:
- `HTTP_PORT` (8080), `GRPC_PORT` (4317), `DB_DRIVER` (sqlite), `DB_DSN`
- `DB_AUTOMIGRATE` (true), `DB_MAX_OPEN_CONNS`, `DB_MAX_IDLE_CONNS`, `DB_CONN_MAX_LIFETIME` (internally capped to 30m when `DB_AZURE_AUTH=true`)
- `DB_SQLITE_READ_POOL_SIZE` (4) — only effective on `DB_DRIVER=sqlite`. Opens a second `*gorm.DB` against the same DB file with `MaxOpen=N` and `PRAGMA query_only=ON`, used by `Repository.reader()` for all read-only queries (`Get*`, `Recent*`, `Search*`, dashboard/metrics/heatmap, GraphRAG span loader, FTS5 search). The writer pool keeps `MaxOpen=1` (SQLite's single-writer model), so reads no longer queue behind in-flight writes/retention/`VACUUM`. Range `[1, 32]`; values `0`, `off`, `false`, `no` disable the pool and reads fall back to the writer. Non-SQLite drivers ignore this setting.
- `DB_AZURE_AUTH` (false) — see Authentication below
- `TLS_CERT_FILE`, `TLS_KEY_FILE` — explicit TLS (both or neither)
- `TLS_AUTO_SELFSIGNED` (false), `TLS_CACHE_DIR` (`./data/tls`) — self-signed bootstrap, ignored if cert files set
Expand Down
9 changes: 5 additions & 4 deletions internal/storage/archive_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package storage
// HotDBSizeBytes returns an approximate size of the hot DB in bytes.
// For SQLite this reads the file size. For others it queries pg_database_size / information_schema.
func (r *Repository) HotDBSizeBytes() int64 {
rdr := r.reader()
switch r.driver {
case "sqlite", "":
var pageCount, pageSize int64
r.db.Raw("PRAGMA page_count").Scan(&pageCount)
r.db.Raw("PRAGMA page_size").Scan(&pageSize)
rdr.Raw("PRAGMA page_count").Scan(&pageCount)
rdr.Raw("PRAGMA page_size").Scan(&pageSize)
return pageCount * pageSize

case "postgres", "postgresql":
var size int64
r.db.Raw("SELECT pg_database_size(current_database())").Scan(&size)
rdr.Raw("SELECT pg_database_size(current_database())").Scan(&size)
return size

case "mysql":
var size int64
r.db.Raw(`SELECT SUM(data_length + index_length) FROM information_schema.tables
rdr.Raw(`SELECT SUM(data_length + index_length) FROM information_schema.tables
WHERE table_schema = DATABASE()`).Scan(&size)
return size

Expand Down
52 changes: 52 additions & 0 deletions internal/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,58 @@ func NewDatabase(driver, dsn string) (*gorm.DB, error) {
return db, nil
}

// NewSQLiteReadPool opens a SECOND GORM handle against the same SQLite file,
// configured with N connections in the pool. Used to alleviate the
// MaxOpen=1 single-writer pool's read/write contention: API/MCP read
// handlers route through this pool, while the writer keeps its
// single-connection serialized pool. WAL mode (already enabled on the
// writer pool) gives each reader connection its own snapshot, so they
// don't block on the writer or on each other.
//
// Caveats:
// - Caller must ensure NewDatabase has already been called against the
// same DSN so WAL/journal-mode/synchronous pragmas are in place. The
// read pool inherits those at the file level.
// - maxOpen is clamped to [1, 32]. A non-positive maxOpen passed to this
// function returns a pool of size 4 (default). Note: the production
// caller (NewRepository) short-circuits size == 0 to mean "read pool
// disabled" and does not invoke this factory in that case — so the
// "0 → 4" path here is only reachable from tests / direct callers.
// - Driver string must already be normalised to "sqlite". Other drivers
// should not be opened through here — they have their own pool config.
func NewSQLiteReadPool(dsn string, maxOpen int) (*gorm.DB, error) {
if dsn == "" {
dsn = "OtelContext.db"
}
if maxOpen <= 0 {
maxOpen = 4
}
if maxOpen > 32 {
maxOpen = 32
}
db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{
Logger: logger.Default.LogMode(logger.Error),
DisableForeignKeyConstraintWhenMigrating: true,
})
if err != nil {
return nil, fmt.Errorf("sqlite read-pool open: %s", scrubDSN(err.Error()))
}
// Defensive — WAL was already set on the writer; idempotent no-op here
// but harmless if the read-pool happens to be opened first.
db.Exec("PRAGMA journal_mode=WAL")
db.Exec("PRAGMA busy_timeout=5000")
db.Exec("PRAGMA query_only=ON")
sqlDB, err := db.DB()
if err != nil {
return nil, fmt.Errorf("sqlite read-pool: get *sql.DB: %w", err)
}
sqlDB.SetMaxOpenConns(maxOpen)
sqlDB.SetMaxIdleConns(maxOpen)
sqlDB.SetConnMaxLifetime(time.Hour)
log.Printf("📖 SQLite read-pool enabled: MaxOpen=%d, query_only=ON", maxOpen)
return db, nil
}

func getEnvPoolInt(key string, fallback int) int {
if v, ok := os.LookupEnv(key); ok {
if i, err := strconv.Atoi(v); err == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/graph_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (r *Repository) GetSpansForGraph(since time.Time) ([]SpanGraphRow, error) {
}

var rows []raw
err := r.db.
err := r.reader().
Table("spans").
Select("spans.span_id, spans.parent_span_id, spans.service_name, spans.operation_name, spans.duration, traces.status AS trace_status, spans.start_time").
Joins("LEFT JOIN traces ON traces.trace_id = spans.trace_id").
Expand Down
14 changes: 7 additions & 7 deletions internal/storage/log_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *Repository) BatchCreateLogs(logs []Log) error {
func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error) {
tenant := TenantFromContext(ctx)
var l Log
if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).First(&l, id).Error; err != nil {
if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).First(&l, id).Error; err != nil {
return nil, fmt.Errorf("failed to get log: %w", err)
}
return &l, nil
Expand All @@ -54,7 +54,7 @@ func (r *Repository) GetLog(ctx context.Context, id uint) (*Log, error) {
func (r *Repository) GetRecentLogs(ctx context.Context, limit int) ([]Log, error) {
tenant := TenantFromContext(ctx)
var logs []Log
if err := r.db.WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil {
if err := r.reader().WithContext(ctx).Where(sqlWhereTenantID, tenant).Order(sqlOrderTimestampDesc).Limit(limit).Find(&logs).Error; err != nil {
return nil, fmt.Errorf("failed to get recent logs: %w", err)
}
return logs, nil
Expand All @@ -81,7 +81,7 @@ func (r *Repository) GetLogsV2(ctx context.Context, filter LogFilter) ([]Log, in
}
}

base := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant)
base := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant)
if useFTS5 {
base = base.Joins("JOIN "+fts5LogsTable+" ON logs.id = "+fts5LogsTable+".rowid").
Where(fts5LogsTable+" MATCH ?", matchExpr)
Expand Down Expand Up @@ -153,7 +153,7 @@ func applyLogFilterCriteria(base *gorm.DB, filter LogFilter) *gorm.DB {
func (r *Repository) getLogsV2LikeFallback(ctx context.Context, filter LogFilter, tenant string) ([]Log, int64, error) {
var logs []Log
var total int64
base := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant)
base := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantID, tenant)
base = applyLogFilterCriteria(base, filter)
if filter.Search != "" {
search := "%" + filter.Search + "%"
Expand All @@ -180,7 +180,7 @@ func (r *Repository) GetLogContext(ctx context.Context, targetTime time.Time) ([
end := targetTime.Add(1 * time.Minute)

var logs []Log
if err := r.db.WithContext(ctx).Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end).
if err := r.reader().WithContext(ctx).Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end).
Order("timestamp asc").
Find(&logs).Error; err != nil {
return nil, fmt.Errorf("failed to fetch log context: %w", err)
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *Repository) LogsForVectorReplay(ctx context.Context, sinceID uint, limi
limit = 10_000
}
var logs []Log
err := r.db.WithContext(ctx).
err := r.reader().WithContext(ctx).
Where("id > ? AND severity IN ?", sinceID, []string{"ERROR", "WARN", "WARNING", "FATAL", "CRITICAL"}).
Order("id ASC").
Limit(limit).
Expand All @@ -244,7 +244,7 @@ func (r *Repository) ListRecentHighSeverityLogsAllTenants(ctx context.Context, s
if limit <= 0 {
limit = 5000
}
q := r.db.WithContext(ctx).Model(&Log{})
q := r.reader().WithContext(ctx).Model(&Log{})
if severity != "" {
q = q.Where(sqlWhereSeverity, severity)
}
Expand Down
14 changes: 7 additions & 7 deletions internal/storage/metrics_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *Repository) BatchCreateMetrics(buckets []MetricBucket) error {
func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time, serviceName string, metricName string) ([]MetricBucket, error) {
tenant := TenantFromContext(ctx)
var buckets []MetricBucket
query := r.db.WithContext(ctx).Where("tenant_id = ? AND time_bucket BETWEEN ? AND ?", tenant, start, end)
query := r.reader().WithContext(ctx).Where("tenant_id = ? AND time_bucket BETWEEN ? AND ?", tenant, start, end)
if serviceName != "" {
query = query.Where("service_name = ?", serviceName)
}
Expand All @@ -86,7 +86,7 @@ func (r *Repository) GetMetricBuckets(ctx context.Context, start, end time.Time,
func (r *Repository) GetMetricNames(ctx context.Context, serviceName string) ([]string, error) {
tenant := TenantFromContext(ctx)
var names []string
query := r.db.WithContext(ctx).Model(&MetricBucket{}).Where("tenant_id = ?", tenant)
query := r.reader().WithContext(ctx).Model(&MetricBucket{}).Where("tenant_id = ?", tenant)
if serviceName != "" {
query = query.Where("service_name = ?", serviceName)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time
tenant := TenantFromContext(ctx)
var stats DashboardStats

baseQuery := r.db.WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantTimeBetween, tenant, start, end)
baseQuery := r.reader().WithContext(ctx).Model(&Trace{}).Where(sqlWhereTenantTimeBetween, tenant, start, end)
if len(serviceNames) > 0 {
baseQuery = baseQuery.Where(sqlWhereServiceIn, serviceNames)
}
Expand All @@ -193,7 +193,7 @@ func (r *Repository) GetDashboardStats(ctx context.Context, start, end time.Time
}

// 2. Total Logs
logQuery := r.db.WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantTimeBetween, tenant, start, end)
logQuery := r.reader().WithContext(ctx).Model(&Log{}).Where(sqlWhereTenantTimeBetween, tenant, start, end)
if len(serviceNames) > 0 {
logQuery = logQuery.Where(sqlWhereServiceIn, serviceNames)
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time
}
var rows []traceRow

query := r.db.WithContext(ctx).Model(&Trace{}).
query := r.reader().WithContext(ctx).Model(&Trace{}).
Select("timestamp, status").
Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end)

Expand Down Expand Up @@ -335,7 +335,7 @@ func (r *Repository) GetTrafficMetrics(ctx context.Context, start, end time.Time
func (r *Repository) GetLatencyHeatmap(ctx context.Context, start, end time.Time, serviceNames []string) ([]LatencyPoint, error) {
tenant := TenantFromContext(ctx)
var points []LatencyPoint
query := r.db.WithContext(ctx).Model(&Trace{}).
query := r.reader().WithContext(ctx).Model(&Trace{}).
Select("timestamp, duration").
Where("tenant_id = ? AND timestamp BETWEEN ? AND ?", tenant, start, end)

Expand Down Expand Up @@ -393,7 +393,7 @@ func (r *Repository) PurgeMetricBucketsBatched(ctx context.Context, olderThan ti
func (r *Repository) GetServices(ctx context.Context) ([]string, error) {
tenant := TenantFromContext(ctx)
var services []string
if err := r.db.WithContext(ctx).Model(&Trace{}).
if err := r.reader().WithContext(ctx).Model(&Trace{}).
Where("tenant_id = ?", tenant).
Distinct("service_name").
Order("service_name ASC").
Expand Down
Loading
Loading