Skip to content

Commit be90f60

Browse files
Remove finished dispatcher handlers, order dispatch handlers
1 parent dbf3273 commit be90f60

3 files changed

Lines changed: 68 additions & 16 deletions

File tree

lib/temporal/workflow/context.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ def wait_for_any(*futures)
254254
fiber = Fiber.current
255255
blocked = true
256256

257-
futures.each do |future|
257+
handlers = futures.map do |future|
258258
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
259259
# Because any of the futures can resume the fiber, ignore any callbacks
260260
# from other futures after unblocking has occurred
@@ -266,6 +266,7 @@ def wait_for_any(*futures)
266266
end
267267

268268
Fiber.yield
269+
handlers.each(&:unregister)
269270

270271
return
271272
end
@@ -279,7 +280,7 @@ def wait_until(&unblock_condition)
279280
fiber = Fiber.current
280281
blocked = true
281282

282-
dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
283+
handler = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
283284
# Because this block can run for any dispatch, ensure the fiber is only
284285
# resumed one time by checking if it's already been unblocked.
285286
if blocked && unblock_condition.call
@@ -289,6 +290,7 @@ def wait_until(&unblock_condition)
289290
end
290291

291292
Fiber.yield
293+
handler.unregister
292294

293295
return
294296
end
@@ -316,6 +318,8 @@ def on_signal(signal_name = nil, &block)
316318
call_in_fiber(block, signal, input)
317319
end
318320
end
321+
322+
return
319323
end
320324

321325
def on_query(query, &block)

lib/temporal/workflow/dispatcher.rb

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,36 @@ class Workflow
1313
# the event_name. The order of this dispatch is not guaranteed.
1414
#
1515
class Dispatcher
16+
class DispatchHandler
17+
def initialize(handlers_for_target, id)
18+
@handlers_for_target = handlers_for_target
19+
@id = id
20+
end
21+
22+
# Unregister the handler from the dispatcher
23+
def unregister
24+
handlers_for_target.delete(id)
25+
end
26+
27+
private
28+
29+
attr_reader :handlers_for_target, :id
30+
end
31+
1632
WILDCARD = '*'.freeze
1733
TARGET_WILDCARD = '*'.freeze
1834

1935
EventStruct = Struct.new(:event_name, :handler)
2036

2137
def initialize
22-
@handlers = Hash.new { |hash, key| hash[key] = [] }
38+
@handlers = Hash.new { |hash, key| hash[key] = {} }
39+
@next_id = 0
2340
end
2441

2542
def register_handler(target, event_name, &handler)
26-
handlers[target] << EventStruct.new(event_name, handler)
27-
self
43+
@next_id += 1
44+
handlers[target][@next_id] = EventStruct.new(event_name, handler)
45+
DispatchHandler.new(handlers[target], @next_id)
2846
end
2947

3048
def dispatch(target, event_name, args = nil)
@@ -39,9 +57,10 @@ def dispatch(target, event_name, args = nil)
3957

4058
def handlers_for(target, event_name)
4159
handlers[target]
42-
.concat(handlers[TARGET_WILDCARD])
43-
.select { |event_struct| match?(event_struct, event_name) }
44-
.map(&:handler)
60+
.merge(handlers[TARGET_WILDCARD]) { raise 'Cannot resolve duplicate dispatcher handler IDs' }
61+
.select { |_, event_struct| match?(event_struct, event_name) }
62+
.sort
63+
.map { |_, event_struct| event_struct.handler }
4564
end
4665

4766
def match?(event_struct, event_name)

spec/unit/lib/temporal/workflow/dispatcher_spec.rb

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
describe '#register_handler' do
99
let(:block) { -> { 'handler body' } }
1010
let(:event_name) { 'signaled' }
11-
let(:dispatcher) { subject.register_handler(target, event_name, &block) }
11+
let(:dispatcher) do
12+
subject.register_handler(target, event_name, &block)
13+
subject
14+
end
1215
let(:handlers) { dispatcher.send(:handlers) }
1316

1417
context 'with default handler_name' do
@@ -19,17 +22,17 @@
1922
end
2023

2124
it 'stores the target and handler once' do
22-
expect(handlers[target]).to be_kind_of(Array)
25+
expect(handlers[target]).to be_kind_of(Hash)
2326
expect(handlers[target].count).to eq 1
2427
end
2528

2629
it 'associates the event name with the target' do
27-
event = handlers[target].first
30+
event = handlers[target][1]
2831
expect(event.event_name).to eq(event_name)
2932
end
3033

3134
it 'associates the handler with the target' do
32-
event = handlers[target].first
35+
event = handlers[target][1]
3336
expect(event.handler).to eq(block)
3437
end
3538
end
@@ -43,20 +46,42 @@
4346
end
4447

4548
it 'stores the target and handler once' do
46-
expect(handlers[target]).to be_kind_of(Array)
49+
expect(handlers[target]).to be_kind_of(Hash)
4750
expect(handlers[target].count).to eq 1
4851
end
4952

5053
it 'associates the event name and handler name with the target' do
51-
event = handlers[target].first
54+
event = handlers[target][1]
5255
expect(event.event_name).to eq(event_name)
5356
end
5457

5558
it 'associates the handler with the target' do
56-
event = handlers[target].first
59+
event = handlers[target][1]
5760
expect(event.handler).to eq(block)
5861
end
5962
end
63+
64+
it 'removes a given handler against the target' do
65+
block1 = -> { 'handler body' }
66+
block2 = -> { 'other handler body' }
67+
block3 = -> { 'yet another handler body' }
68+
69+
handle1 = subject.register_handler(target, 'signaled', &block1)
70+
subject.register_handler(target, 'signaled', &block2)
71+
subject.register_handler(other_target, 'signaled', &block3)
72+
73+
expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
74+
expect(subject.send(:handlers)[target][1].handler).to be(block1)
75+
76+
expect(subject.send(:handlers)[target][2].event_name).to eq('signaled')
77+
expect(subject.send(:handlers)[target][2].handler).to be(block2)
78+
79+
expect(subject.send(:handlers)[other_target][3].event_name).to eq('signaled')
80+
expect(subject.send(:handlers)[other_target][3].handler).to be(block3)
81+
82+
handle1.unregister
83+
expect(subject.send(:handlers)[target][1]).to be(nil)
84+
end
6085
end
6186

6287
describe '#dispatch' do
@@ -114,10 +139,13 @@
114139

115140
context 'with TARGET_WILDCARD target handler' do
116141
let(:handler_6) { -> { 'sixth block' } }
142+
let(:handler_7) { -> { 'seventh block' } }
117143
before do
118144
allow(handler_6).to receive(:call)
145+
allow(handler_7).to receive(:call)
119146

120147
subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
148+
subject.register_handler(target, 'completed', &handler_7)
121149
end
122150

123151
it 'calls the handler' do
@@ -127,6 +155,7 @@
127155
expect(handler_1).to have_received(:call).ordered
128156
expect(handler_4).to have_received(:call).ordered
129157
expect(handler_6).to have_received(:call).ordered
158+
expect(handler_7).to have_received(:call).ordered
130159
end
131160

132161
it 'TARGET_WILDCARD can be compared to an EventTarget object' do
@@ -157,4 +186,4 @@
157186
end
158187
end
159188
end
160-
end
189+
end

0 commit comments

Comments
 (0)