Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 65 additions & 10 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,57 @@ type Client interface {
Session() scaleset.RunnerScaleSetSession
}

type Option func(*Listener)
// MetricsRecorder defines the hook methods that will be called by the listener at
// various points in the message handling process. This allows for custom
// metrics to be collected without coupling the listener to a specific metrics
// implementation. The methods in this interface will be called with relevant
// information about the message handling process, such as the number of jobs
// started/completed, the desired runner count, and any errors that occur.
// Implementers can use this information to track the performance and behavior
// of the listener and the scaleset service.
type MetricsRecorder interface {
RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic)
RecordJobStarted(msg *scaleset.JobStarted)
RecordJobCompleted(msg *scaleset.JobCompleted)
RecordDesiredRunners(count int)
}

type discardMetricsRecorder struct{}

func (d *discardMetricsRecorder) RecordStatistics(statistics *scaleset.RunnerScaleSetStatistic) {}
func (d *discardMetricsRecorder) RecordJobStarted(msg *scaleset.JobStarted) {}
func (d *discardMetricsRecorder) RecordJobCompleted(msg *scaleset.JobCompleted) {}
func (d *discardMetricsRecorder) RecordDesiredRunners(count int) {}

// Listener listens for messages from the scaleset service and handles them. It automatically handles session
// creation/deletion/refreshing and message polling and acking.
type Listener struct {
// The main client responsible for communicating with the scaleset service
client Client
client Client
metricsRecorder MetricsRecorder

// Configuration for the listener
scaleSetID int
maxRunners atomic.Uint32
scaleSetID int
maxRunners atomic.Uint32
latestStatistics *scaleset.RunnerScaleSetStatistic

// configuration for the listener
logger *slog.Logger
}

type Option func(*Listener)

// WithMetricsRecorder sets the MetricsRecorder for the listener. If not set, a no-op recorder will be used.
// If the nil is passed, the MetricsRecorder will not be updated and the existing one will be used (which is a no-op by default).
func WithMetricsRecorder(recorder MetricsRecorder) Option {
return func(l *Listener) {
if recorder == nil {
return
}
l.metricsRecorder = recorder
}
}
Comment on lines +97 to +104
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithMetricsRecorder will happily accept nil and overwrite the default recorder, which will panic later when the listener calls l.metricsRecorder.*. Consider ignoring nil recorders (keep the discard implementation) or defaulting nil to &discardMetricsRecorder{} inside this option.

Copilot uses AI. Check for mistakes.

// SetMaxRunners sets the capacity of the scaleset. It is concurrently
// safe to update the max runners during listener.Run.
func (l *Listener) SetMaxRunners(count int) {
Expand All @@ -85,12 +120,17 @@ func New(client Client, config Config, options ...Option) (*Listener, error) {
}

listener := &Listener{
client: client,
scaleSetID: config.ScaleSetID,
logger: config.Logger,
client: client,
metricsRecorder: &discardMetricsRecorder{},
scaleSetID: config.ScaleSetID,
logger: config.Logger,
}
listener.SetMaxRunners(config.MaxRunners)

for _, option := range options {
option(listener)
}

return listener, nil
}

Expand All @@ -114,13 +154,17 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
return fmt.Errorf("session statistics is nil")
}

l.handleStatistics(ctx, initialSession.Statistics)

l.logger.Info(
"Handling initial session statistics",
slog.Int("totalAssignedJobs", initialSession.Statistics.TotalAssignedJobs),
)
if _, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs); err != nil {
desiredCount, err := scaler.HandleDesiredRunnerCount(ctx, initialSession.Statistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("handling initial message failed: %w", err)
}
l.metricsRecorder.RecordDesiredRunners(desiredCount)
}

var lastMessageID int
Expand All @@ -142,7 +186,7 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
}

if msg == nil {
_, err := scaler.HandleDesiredRunnerCount(ctx, 0)
_, err := scaler.HandleDesiredRunnerCount(ctx, l.latestStatistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("handling nil message failed: %w", err)
}
Expand All @@ -160,24 +204,35 @@ func (l *Listener) Run(ctx context.Context, scaler Scaler) error {
}

func (l *Listener) handleMessage(ctx context.Context, handler Scaler, msg *scaleset.RunnerScaleSetMessage) error {
l.handleStatistics(ctx, msg.Statistics)

if err := l.client.DeleteMessage(ctx, msg.MessageID); err != nil {
return fmt.Errorf("failed to delete message: %w", err)
}

for _, jobStarted := range msg.JobStartedMessages {
l.metricsRecorder.RecordJobStarted(jobStarted)
if err := handler.HandleJobStarted(ctx, jobStarted); err != nil {
return fmt.Errorf("failed to handle job started: %w", err)
}
}
for _, jobCompleted := range msg.JobCompletedMessages {
l.metricsRecorder.RecordJobCompleted(jobCompleted)
if err := handler.HandleJobCompleted(ctx, jobCompleted); err != nil {
return fmt.Errorf("failed to handle job completed: %w", err)
}
}

if _, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs); err != nil {
desiredCount, err := handler.HandleDesiredRunnerCount(ctx, msg.Statistics.TotalAssignedJobs)
if err != nil {
return fmt.Errorf("failed to handle desired runner count: %w", err)
Comment on lines 206 to 228
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleMessage assumes msg.Statistics is non-nil (handleStatistics assigns it and later msg.Statistics.TotalAssignedJobs is dereferenced). Since RunnerScaleSetMessage.Statistics is a pointer, a nil value here will cause a panic and can also set l.latestStatistics to nil (breaking the msg == nil path). Add a nil check and return an error (similar to the initial session statistics check) before using it.

Copilot uses AI. Check for mistakes.
}
l.metricsRecorder.RecordDesiredRunners(desiredCount)

return nil
}

func (l *Listener) handleStatistics(ctx context.Context, msg *scaleset.RunnerScaleSetStatistic) {
l.latestStatistics = msg
l.metricsRecorder.RecordStatistics(msg)
}
97 changes: 47 additions & 50 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,39 +75,37 @@ func TestListener_Run(t *testing.T) {
client := NewMockClient(t)

uuid := uuid.New()
initialStatistics := &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 2,
}
session := scaleset.RunnerScaleSetSession{
SessionID: uuid,
OwnerName: "example",
RunnerScaleSet: &scaleset.RunnerScaleSet{},
MessageQueueURL: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &scaleset.RunnerScaleSetStatistic{},
Statistics: initialStatistics,
}

client.On("Session").Return(session).Once()

l, err := New(client, config)
metricsRecorder := NewMockMetricsRecorder(t)
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Run(func(mock.Arguments) { cancel() }).
Once()

l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
require.Nil(t, err)

var called bool
handler := NewMockScaler(t)
handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
mock.Anything,
).
Return(0, nil).
Run(
func(mock.Arguments) {
called = true
cancel()
},
).
handler.On("HandleDesiredRunnerCount", mock.Anything, mock.Anything).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()

err = l.Run(ctx, handler)
assert.ErrorIs(t, err, context.Canceled)
assert.True(t, called)
})

t.Run("cancel context after get message", func(t *testing.T) {
Expand All @@ -120,61 +118,60 @@ func TestListener_Run(t *testing.T) {
MaxRunners: 10,
}

client := NewMockClient(t)
uuid := uuid.New()
initialStatistics := &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 2,
}

session := scaleset.RunnerScaleSetSession{
SessionID: uuid,
OwnerName: "example",
RunnerScaleSet: &scaleset.RunnerScaleSet{},
MessageQueueURL: "https://example.com",
MessageQueueAccessToken: "1234567890",
Statistics: &scaleset.RunnerScaleSetStatistic{},
Statistics: initialStatistics,
}

msg := &scaleset.RunnerScaleSetMessage{
MessageID: 1,
Statistics: &scaleset.RunnerScaleSetStatistic{},
MessageID: 1,
Statistics: &scaleset.RunnerScaleSetStatistic{
TotalAssignedJobs: 3,
},
}

metricsRecorder := NewMockMetricsRecorder(t)
client := NewMockClient(t)
handler := NewMockScaler(t)

client.On("Session").Return(session).Once()
client.On(
"GetMessage",
ctx,
mock.Anything,
10,
).
metricsRecorder.On("RecordStatistics", initialStatistics).Once()
metricsRecorder.On("RecordDesiredRunners", initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()
handler.On("HandleDesiredRunnerCount", mock.Anything, initialStatistics.TotalAssignedJobs).
Return(initialStatistics.TotalAssignedJobs, nil).
Once()

client.On("GetMessage", ctx, mock.Anything, 10).
Return(msg, nil).
Run(
func(mock.Arguments) {
cancel()
},
).
Run(func(mock.Arguments) { cancel() }).
Once()

metricsRecorder.On("RecordStatistics", msg.Statistics).Once()
// Ensure delete message is called without cancel
client.On(
"DeleteMessage",
context.WithoutCancel(ctx),
mock.Anything,
).Return(nil).Once()
client.On("DeleteMessage", context.WithoutCancel(ctx), mock.Anything).
Return(nil).
Once()

handler := NewMockScaler(t)
handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
0,
).
Return(0, nil).
metricsRecorder.On("RecordDesiredRunners", msg.Statistics.TotalAssignedJobs).
Return(msg.Statistics.TotalAssignedJobs, nil).
Once()

handler.On(
"HandleDesiredRunnerCount",
mock.Anything,
mock.Anything,
).
Return(0, nil).
handler.On("HandleDesiredRunnerCount", mock.Anything, msg.Statistics.TotalAssignedJobs).
Return(msg.Statistics.TotalAssignedJobs, nil).
Once()

l, err := New(client, config)
l, err := New(client, config, WithMetricsRecorder(metricsRecorder))
require.Nil(t, err)

err = l.Run(ctx, handler)
Expand Down
Loading