-
Notifications
You must be signed in to change notification settings - Fork 1
Collect and send PostgreSQL query performance metrics via pg_stat_statements #175
Description
Summary
Add query performance monitoring by collecting aggregate stats from pg_stat_statements and sending them as a new postgresql.statements metric type to the selfhost control plane. This enables users to identify slow queries, track execution trends, and set alerts on query performance thresholds.
Problem
Currently, hostlink collects system-level PostgreSQL metrics (connections, cache hit ratio, TPS, replication lag) but has no visibility into which queries are slow or resource-intensive. Users cannot answer:
- Which queries take the most total time?
- Which queries have the highest average execution time?
- Which queries are called most frequently and might benefit from optimization?
Approach
Inspired by New Relic's query performance monitoring: use pg_stat_statements aggregate data per query fingerprint, delta-based collection (per-interval, not cumulative), top 20 queries per tick.
Requirements
Hostlink (this repo)
Domain — new metric type
- Add
MetricTypePostgreSQLStatements = "postgresql.statements"constant indomain/metrics/metrics.goalongside existing constants (MetricTypeSystem,MetricTypeNetwork,MetricTypePostgreSQLDatabase,MetricTypeStorage) - Add
QuerySamplestruct indomain/metrics/metrics.go:type QuerySample struct { QueryID string `json:"query_id"` Query string `json:"query"` Calls int64 `json:"calls"` TotalExecTimeMs float64 `json:"total_exec_time_ms"` MeanExecTimeMs float64 `json:"mean_exec_time_ms"` MinExecTimeMs float64 `json:"min_exec_time_ms"` MaxExecTimeMs float64 `json:"max_exec_time_ms"` Rows int64 `json:"rows"` SharedBlksHit int64 `json:"shared_blks_hit"` SharedBlksRead int64 `json:"shared_blks_read"` TempBlksWritten int64 `json:"temp_blks_written"` WALBytes int64 `json:"wal_bytes"` }
- Add
PostgreSQLStatementMetricswrapper struct:type PostgreSQLStatementMetrics struct { Queries []QuerySample `json:"queries"` }
- Add metric type constant:
MetricTypePostgreSQLStatements = "postgresql.statements"
Collector — update interface and implementation
- Extend
pgmetrics.Collectorinterface ininternal/pgmetrics/collector.gowith a new method:CollectStatements(credential.Credential) (metrics.PostgreSQLStatementMetrics, error)(avoids breaking the existingCollectsignature) - Add internal state for statement delta tracking to
pgmetricsstruct:lastStatements map[string]pgStatementStatsandlastStatementsTime time.Time - Implement
collectStatementMetrics():SELECT queryid::text, query, calls, total_exec_time, min_exec_time, max_exec_time, mean_exec_time, rows, shared_blks_hit, shared_blks_read, temp_blks_written, wal_bytes FROM pg_stat_statements WHERE dbid = (SELECT oid FROM pg_database WHERE datname = current_database()) ORDER BY total_exec_time DESC LIMIT 20
- Delta-based calculation: store previous snapshot keyed by
queryid, compute per-interval deltas forcalls,total_exec_time, recomputemean_exec_timefrom deltas - First collection returns empty
Queriesslice — establishes baseline - Gracefully skip if
pg_stat_statementsextension is not available (check error code, log warning, return empty metrics — do NOT fail the push)
Pusher — wire into metricspusher
- Update
NewWithDependencies()inapp/services/metrics/metrics.goto accept the extendedpgmetrics.Collector(already the dependency — just needs to call the new method) - In
Push(), callmp.metricscollector.CollectStatements(cred)after existing PG collection - Append
MetricSet{Type: MetricTypePostgreSQLStatements, Metrics: stmtMetrics}tometricSets - Include even when
Queriesis empty (signals extension available but no activity)
Tests
- Unit tests for delta calculation: first collection → empty, subsequent → correct deltas, query appearing/disappearing between collections, stats reset
- Unit test for graceful handling when extension is missing (mock returns error, verify empty result returned without propagation)
- Integration test with PostgreSQL container (requires
shared_preload_libraries = 'pg_stat_statements') — verify actual queries appear in results
Selfhost (separate PR)
Database — new table
- Migration for
agent_query_samplestable:create_table :agent_query_samples do |t| t.string :agent_pid, null: false t.string :query_id t.text :query_text t.bigint :calls t.float :total_exec_time_ms t.float :mean_exec_time_ms t.float :min_exec_time_ms t.float :max_exec_time_ms t.bigint :rows t.bigint :shared_blks_hit t.bigint :shared_blks_read t.bigint :temp_blks_written t.bigint :wal_bytes t.datetime :timestamp t.timestamps end
- Indexes:
(agent_pid, timestamp),(agent_pid, query_id, timestamp) - Foreign key:
agent_pid→agents.pid(follows same pattern asagent_heartbeats)
Model
-
AgentQuerySamplemodel inapp/models/agent_query_sample.rb:class AgentQuerySample < ApplicationRecord belongs_to :agent, foreign_key: :agent_pid, primary_key: :pid end
- Add
has_many :agent_query_samplestoAgentmodel
Ingestion — heartbeats_controller#create
- Add branch inside the
metric_sets.eachloop: whenmetric_set["type"] == "postgresql.statements", iterate overmetric_set["metrics"]["queries"]array and createAgentQuerySamplerecords instead ofAgentHeartbeat - Continue creating
AgentHeartbeatfor all other metric types (no change to existing flow) - ActionCable broadcast: include
postgresql.statementsmetric set in the broadcast payload (same as other types — no special handling needed)
API — query performance endpoint
- New route:
GET /organizations/:id/query_performance?agent_pid=...&start_date=...&end_date=...&sort_by=total_exec_time|mean_exec_time|calls&limit=20 - New action in
organizations_controller(follows pattern ofinstance_metricsaction at line 155) - Query:
AgentQuerySample.where(agent_pid:).where(timestamp: range).order(sort_by => :desc).limit(limit) - Group by
query_idfor time-series view of individual query trends
Alerting
- Add to
AlertRule::VALID_METRICSinapp/models/alert_rule.rb:"postgresql.statements" => %w[mean_exec_time_ms max_exec_time_ms calls]
- Add
when "postgresql.statements"branch inEvaluationService#query_heartbeat(app/services/alerting/evaluation_service.rb:107) - Add
query_statement_metrics(instance)method: query latestAgentQuerySampleperquery_id, extract the alert'smetric_namefrom the sample with highestmean_exec_time_ms(alert on worst query, not average across all)
Health check — optional
- In
InstanceHealthCheckJob, optionally check ifpostgresql.statementsdata is flowing (staleness check like other metric types)
Architecture
pg_stat_statements ──SQL──► hostlink/pgmetrics ──delta──► metric payload
│
POST /api/v1/agents/{id}/metrics
▼
selfhost controller
│
┌────────────────┼────────────────┐
│ │ │
▼ ▼ ▼
AgentHeartbeat AgentQuerySample ActionCable
(existing) (NEW - per query (real-time
system/net/ fingerprint) broadcast)
pg.database/ ┌──┘
storage │
│
┌──────────────────────────┘
│
┌─────────┼─────────────┐
▼ ▼ ▼
Alert rules Query perf Dashboard
(mean_time API endpoint (top N slow,
> threshold) (historical) sparklines)
Wire changes summary
| File | Change |
|---|---|
domain/metrics/metrics.go |
Add MetricTypePostgreSQLStatements constant, QuerySample struct, PostgreSQLStatementMetrics struct |
internal/pgmetrics/collector.go |
Add CollectStatements() to Collector interface, implement with delta tracking |
app/services/metrics/metrics.go |
Call CollectStatements() in Push(), append new MetricSet |
app/jobs/metricsjob/metricsjob.go |
No change (already calls mp.Push(cred) which handles everything) |
Deferred (v2+)
- Individual slow query samples (pg_stat_activity snapshots for queries exceeding threshold)
- Wait time analysis (blk_read_time, blk_write_time — requires
track_io_timing) - Query plan analysis (EXPLAIN execution)
- Blocking/lock analysis (pg_stat_activity + pg_locks join)