Skip to content

Commit 6df7877

Browse files
Add check to prevent same worker from picking up requeued tests
1 parent e4e3f62 commit 6df7877

9 files changed

Lines changed: 257 additions & 12 deletions

File tree

redis/acknowledge.lua

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ local zset_key = KEYS[1]
22
local processed_key = KEYS[2]
33
local owners_key = KEYS[3]
44
local error_reports_key = KEYS[4]
5+
local requeued_by_key = KEYS[5]
56

67
local entry = ARGV[1]
78
local test_id = ARGV[2]
89
local error = ARGV[3]
910
local ttl = ARGV[4]
1011
redis.call('zrem', zset_key, entry)
1112
redis.call('hdel', owners_key, entry) -- Doesn't matter if it was reclaimed by another workers
13+
redis.call('hdel', requeued_by_key, entry)
1214
local acknowledged = redis.call('sadd', processed_key, test_id) == 1
1315

1416
if acknowledged and error ~= "" then

redis/requeue.lua

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ local zset_key = KEYS[4]
55
local worker_queue_key = KEYS[5]
66
local owners_key = KEYS[6]
77
local error_reports_key = KEYS[7]
8+
local requeued_by_key = KEYS[8]
89

910
local max_requeues = tonumber(ARGV[1])
1011
local global_max_requeues = tonumber(ARGV[2])
1112
local entry = ARGV[3]
1213
local test_id = ARGV[4]
1314
local offset = ARGV[5]
15+
local ttl = tonumber(ARGV[6])
1416

1517
if redis.call('hget', owners_key, entry) == worker_queue_key then
1618
redis.call('hdel', owners_key, entry)
@@ -42,6 +44,11 @@ else
4244
redis.call('lpush', queue_key, entry)
4345
end
4446

47+
redis.call('hset', requeued_by_key, entry, worker_queue_key)
48+
if ttl and ttl > 0 then
49+
redis.call('expire', requeued_by_key, ttl)
50+
end
51+
4552
redis.call('zrem', zset_key, entry)
4653

4754
return true

redis/reserve.lua

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,54 @@ local zset_key = KEYS[2]
33
local processed_key = KEYS[3]
44
local worker_queue_key = KEYS[4]
55
local owners_key = KEYS[5]
6+
local requeued_by_key = KEYS[6]
7+
local workers_key = KEYS[7]
68

79
local current_time = ARGV[1]
10+
local defer_offset = tonumber(ARGV[2]) or 0
11+
local max_skip_attempts = 4
812

9-
local test = redis.call('rpop', queue_key)
10-
if test then
11-
redis.call('zadd', zset_key, current_time, test)
12-
redis.call('lpush', worker_queue_key, test)
13-
redis.call('hset', owners_key, test, worker_queue_key)
14-
return test
15-
else
16-
return nil
13+
local function insert_with_offset(test)
14+
local pivot = redis.call('lrange', queue_key, -1 - defer_offset, 0 - defer_offset)[1]
15+
if pivot then
16+
redis.call('linsert', queue_key, 'BEFORE', pivot, test)
17+
else
18+
redis.call('lpush', queue_key, test)
19+
end
1720
end
21+
22+
for attempt = 1, max_skip_attempts do
23+
local test = redis.call('rpop', queue_key)
24+
if not test then
25+
return nil
26+
end
27+
28+
local requeued_by = redis.call('hget', requeued_by_key, test)
29+
if requeued_by == worker_queue_key then
30+
-- If this build only has one worker, allow immediate self-pickup.
31+
if redis.call('scard', workers_key) <= 1 then
32+
redis.call('hdel', requeued_by_key, test)
33+
redis.call('zadd', zset_key, current_time, test)
34+
redis.call('lpush', worker_queue_key, test)
35+
redis.call('hset', owners_key, test, worker_queue_key)
36+
return test
37+
end
38+
39+
insert_with_offset(test)
40+
41+
-- If this worker only finds its own requeued tests, defer once by returning nil,
42+
-- then allow pickup on a subsequent reserve attempt.
43+
if attempt == max_skip_attempts then
44+
redis.call('hdel', requeued_by_key, test)
45+
return nil
46+
end
47+
else
48+
redis.call('hdel', requeued_by_key, test)
49+
redis.call('zadd', zset_key, current_time, test)
50+
redis.call('lpush', worker_queue_key, test)
51+
redis.call('hset', owners_key, test, worker_queue_key)
52+
return test
53+
end
54+
end
55+
56+
return nil

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def acknowledge(test_key, error: nil, pipeline: redis)
183183
unreserve_entry(test_id)
184184
eval_script(
185185
:acknowledge,
186-
keys: [key('running'), key('processed'), key('owners'), key('error-reports')],
186+
keys: [key('running'), key('processed'), key('owners'), key('error-reports'), key('requeued-by')],
187187
argv: [entry, test_id, error.to_s, config.redis_ttl],
188188
pipeline: pipeline,
189189
) == 1
@@ -206,8 +206,9 @@ def requeue(test, offset: Redis.requeue_offset)
206206
key('worker', worker_id, 'queue'),
207207
key('owners'),
208208
key('error-reports'),
209+
key('requeued-by'),
209210
],
210-
argv: [config.max_requeues, global_max_requeues, entry, test_id, offset],
211+
argv: [config.max_requeues, global_max_requeues, entry, test_id, offset, config.redis_ttl],
211212
) == 1
212213

213214
unless requeued
@@ -372,8 +373,10 @@ def try_to_reserve_test
372373
key('processed'),
373374
key('worker', worker_id, 'queue'),
374375
key('owners'),
376+
key('requeued-by'),
377+
key('workers'),
375378
],
376-
argv: [CI::Queue.time_now.to_f],
379+
argv: [CI::Queue.time_now.to_f, Redis.requeue_offset],
377380
)
378381
end
379382

ruby/lib/minitest/queue/local_requeue_reporter.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ def message_for(test)
4646
def result_line
4747
"#{super}, #{requeues} requeues"
4848
end
49+
50+
def location(exception)
51+
backtrace = exception.backtrace
52+
return super if backtrace && !backtrace.empty?
53+
54+
nested_exception = exception.respond_to?(:error) ? exception.error : nil
55+
nested_backtrace = nested_exception&.backtrace
56+
return super(nested_exception) if nested_backtrace && !nested_backtrace.empty?
57+
58+
'unknown'
59+
end
4960
end
5061
end
5162
end

ruby/lib/minitest/queue/test_data.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,13 @@ def relative_path_for(path)
137137
def error_location(exception)
138138
@error_location ||= begin
139139
last_before_assertion = ''
140-
exception.backtrace.reverse_each do |s|
140+
backtrace_for(exception).reverse_each do |s|
141141
break if s =~ /in .(assert|refute|flunk|pass|fail|raise|must|wont)/
142142

143143
last_before_assertion = s
144144
end
145+
return ['unknown', 0] if last_before_assertion.empty?
146+
145147
path = last_before_assertion.sub(/:in .*$/, '')
146148
# the path includes the linenumber at the end,
147149
# which is seperated by a :
@@ -151,6 +153,17 @@ def error_location(exception)
151153
[result.first, result.last.to_i]
152154
end
153155
end
156+
157+
def backtrace_for(exception)
158+
backtrace = exception.backtrace
159+
return backtrace if backtrace && !backtrace.empty?
160+
161+
nested_exception = exception.respond_to?(:error) ? exception.error : nil
162+
nested_backtrace = nested_exception&.backtrace
163+
return nested_backtrace if nested_backtrace && !nested_backtrace.empty?
164+
165+
[]
166+
end
154167
end
155168
end
156169
end

ruby/test/ci/queue/redis_test.rb

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,28 @@ def test_streaming_timeout_raises_lost_master
304304
end
305305
end
306306

307+
def test_reserve_defers_own_requeued_test_once
308+
queue = worker(1, populate: false, build_id: 'self-requeue-script')
309+
queue.send(:register)
310+
entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
311+
queue_key = queue.send(:key, 'queue')
312+
requeued_by_key = queue.send(:key, 'requeued-by')
313+
worker_queue_key = queue.send(:key, 'worker', queue.config.worker_id, 'queue')
314+
workers_key = queue.send(:key, 'workers')
315+
316+
@redis.lpush(queue_key, entry)
317+
@redis.hset(requeued_by_key, entry, worker_queue_key)
318+
@redis.sadd(workers_key, '2')
319+
320+
first_try = queue.send(:try_to_reserve_test)
321+
assert_nil first_try
322+
assert_equal [entry], @redis.lrange(queue_key, 0, -1)
323+
assert_nil @redis.hget(requeued_by_key, entry)
324+
325+
second_try = queue.send(:try_to_reserve_test)
326+
assert_equal entry, second_try
327+
end
328+
307329
def test_heartbeat_uses_test_id_for_processed_check
308330
queue = worker(1, populate: false)
309331
entry = "ATest#test_foo#{DELIMITER}/tmp/a_test.rb"
@@ -428,6 +450,87 @@ def test_worker_profiles_aggregates_multiple_workers
428450
assert_equal 'non-leader', profiles['2']['role']
429451
end
430452

453+
def test_worker_does_not_pick_up_its_own_requeued_test_when_others_are_available
454+
@redis.flushdb
455+
456+
test_list = TEST_LIST.first(3)
457+
w1 = worker(1, tests: test_list, build_id: 'self-requeue', timeout: 10, max_requeues: 1, requeue_tolerance: 1.0)
458+
w2 = worker(2, populate: false, build_id: 'self-requeue', timeout: 10, max_requeues: 1, requeue_tolerance: 1.0)
459+
w3 = worker(3, populate: false, build_id: 'self-requeue', timeout: 10, max_requeues: 1, requeue_tolerance: 1.0)
460+
w2.send(:register)
461+
w3.send(:register)
462+
463+
id_for = ->(test) { test.respond_to?(:id) ? test.id : CI::Queue::QueueEntry.test_id(test) }
464+
465+
requeued_test_id = nil
466+
picked_up_requeue = {}
467+
worker_two_reserved = false
468+
worker_three_reserved = false
469+
release_other_workers = false
470+
471+
mon = Monitor.new
472+
cond = mon.new_cond
473+
474+
threads = [
475+
Thread.new do
476+
w2.poll do |test|
477+
test_id = id_for.call(test)
478+
mon.synchronize do
479+
worker_two_reserved = true
480+
picked_up_requeue['2'] = true if test_id == requeued_test_id
481+
cond.broadcast
482+
cond.wait_until { release_other_workers }
483+
end
484+
w2.acknowledge(test_id)
485+
end
486+
end,
487+
Thread.new do
488+
w3.poll do |test|
489+
test_id = id_for.call(test)
490+
mon.synchronize do
491+
worker_three_reserved = true
492+
picked_up_requeue['3'] = true if test_id == requeued_test_id
493+
cond.broadcast
494+
cond.wait_until { release_other_workers }
495+
end
496+
w3.acknowledge(test_id)
497+
end
498+
end,
499+
]
500+
501+
mon.synchronize do
502+
cond.wait_until { worker_two_reserved && worker_three_reserved }
503+
end
504+
505+
worker_one_picked_its_own_requeue = false
506+
first_test = true
507+
508+
w1.poll do |test|
509+
test_id = id_for.call(test)
510+
511+
if first_test
512+
first_test = false
513+
requeued_test_id = test_id
514+
w1.report_failure!
515+
assert_equal true, w1.requeue(test)
516+
mon.synchronize do
517+
release_other_workers = true
518+
cond.broadcast
519+
end
520+
else
521+
worker_one_picked_its_own_requeue = true if test_id == requeued_test_id
522+
w1.acknowledge(test_id)
523+
end
524+
end
525+
526+
threads.each { |t| t.join(5) }
527+
528+
assert_equal false, worker_one_picked_its_own_requeue
529+
assert_equal true, picked_up_requeue.values.any?
530+
ensure
531+
threads&.each(&:kill)
532+
end
533+
431534
private
432535

433536
def shuffled_test_list
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
require 'test_helper'
3+
4+
module Minitest::Queue
5+
class LocalRequeueReporterTest < Minitest::Test
6+
include ReporterTestHelper
7+
8+
def setup
9+
@reporter = LocalRequeueReporter.new(verbose: true)
10+
end
11+
12+
def test_message_for_requeued_failure_without_backtrace
13+
test = result('test_foo', requeued: true)
14+
test.failure.failure.set_backtrace(nil)
15+
16+
message = @reporter.send(:message_for, test)
17+
18+
assert_includes message, '[unknown]'
19+
assert_includes message, 'Failed'
20+
end
21+
end
22+
end
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
require 'test_helper'
4+
5+
module Minitest::Queue
6+
class TestDataTest < Minitest::Test
7+
include ReporterTestHelper
8+
9+
def test_error_location_without_backtrace
10+
failure = Minitest::Assertion.new('Assertion failed')
11+
test = result('test_foo', failure: failure)
12+
failure.set_backtrace(nil)
13+
14+
data = TestData.new(
15+
test: test,
16+
index: 0,
17+
namespace: 'namespace',
18+
base_path: Minitest::Queue.project_root,
19+
).to_h
20+
21+
assert_equal 'unknown', data[:error_file_path]
22+
assert_equal 0, data[:error_file_number]
23+
end
24+
25+
def test_error_location_uses_nested_exception_backtrace
26+
error = StandardError.new('boom')
27+
error.set_backtrace([
28+
"#{Minitest::Queue.project_root}/test/nested_error_test.rb:42:in `boom'",
29+
])
30+
failure = Minitest::UnexpectedError.new(error)
31+
failure.set_backtrace(nil)
32+
test = result('test_foo', failure: failure)
33+
34+
data = TestData.new(
35+
test: test,
36+
index: 0,
37+
namespace: 'namespace',
38+
base_path: Minitest::Queue.project_root,
39+
).to_h
40+
41+
assert_equal 'test/nested_error_test.rb', data[:error_file_path].to_s
42+
assert_equal 42, data[:error_file_number]
43+
end
44+
end
45+
end

0 commit comments

Comments
 (0)