Skip to content

Commit 0779681

Browse files
Extend wait_for to take multiple futures and a condition block
1 parent fba30df commit 0779681

8 files changed

Lines changed: 243 additions & 10 deletions

File tree

examples/bin/worker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ worker.register_workflow(SideEffectWorkflow)
4242
worker.register_workflow(SimpleTimerWorkflow)
4343
worker.register_workflow(TimeoutWorkflow)
4444
worker.register_workflow(TripBookingWorkflow)
45+
worker.register_workflow(WaitForWorkflow)
4546

4647
worker.register_activity(AsyncActivity)
4748
worker.register_activity(EchoActivity)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
require 'workflows/wait_for_workflow'
2+
3+
describe WaitForWorkflow do
4+
5+
it 'signals at workflow start time' do
6+
workflow_id = SecureRandom.uuid
7+
run_id = Temporal.start_workflow(
8+
WaitForWorkflow,
9+
10, # number of echo activities to run
10+
2, # max activity parallelism
11+
'signal_name',
12+
options: { workflow_id: workflow_id }
13+
)
14+
15+
Temporal.signal_workflow(WaitForWorkflow, 'signal_name', workflow_id, run_id)
16+
17+
result = Temporal.await_workflow_result(
18+
WaitForWorkflow,
19+
workflow_id: workflow_id,
20+
run_id: run_id,
21+
)
22+
23+
expect(result.length).to eq(3)
24+
expect(result[:signal]).to eq(true)
25+
expect(result[:timer]).to eq(true)
26+
expect(result[:activity]).to eq(true)
27+
end
28+
end
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
require 'activities/echo_activity'
2+
require 'activities/long_running_activity'
3+
4+
# This example workflow exercises all three conditions that can change state that is being
5+
# awaited upon: activity completion, sleep completion, signal receieved.
6+
class WaitForWorkflow < Temporal::Workflow
7+
def execute(total_echos, max_echos_at_once, expected_signal)
8+
signals_received = {}
9+
10+
workflow.on_signal do |signal, input|
11+
signals_received[signal] = input
12+
end
13+
14+
workflow.wait_for do
15+
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
16+
signals_received.key?(expected_signal)
17+
end
18+
19+
# Run an activity but with a max time limit by starting a timer. This activity
20+
# will not complete before the timer, which may result in a failed activity task after the
21+
# workflow is completed.
22+
long_running_future = LongRunningActivity.execute(15, 0.1)
23+
timeout_timer = workflow.start_timer(1)
24+
workflow.wait_for(timeout_timer, long_running_future)
25+
26+
timer_beat_activity = timeout_timer.finished? && !long_running_future.finished?
27+
28+
# This should not wait further. The first future has already finished, and therefore
29+
# the second one should not be awaited upon.
30+
long_timeout_timer = workflow.start_timer(15)
31+
workflow.wait_for(timeout_timer, long_timeout_timer)
32+
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?
33+
34+
block_called = false
35+
workflow.wait_for(timeout_timer) do
36+
# This should never be called because the timeout_timer future was already
37+
# finished before the wait was even called.
38+
block_called = true
39+
end
40+
raise 'Block should not have been called' if block_called
41+
42+
workflow.wait_for(long_timeout_timer) do
43+
# This condition will immediately be true and not result in any waiting or dispatching
44+
true
45+
end
46+
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?
47+
48+
activity_futures = {}
49+
echos_completed = 0
50+
51+
total_echos.times do |i|
52+
workflow.wait_for do
53+
workflow.logger.info("Activities in flight #{activity_futures.length}")
54+
# Pause workflow until the number of active activity futures is less than 2. This
55+
# will throttle new activities from being started, guaranteeing that only two of these
56+
# activities are running at once.
57+
activity_futures.length < max_echos_at_once
58+
end
59+
60+
future = EchoActivity.execute("hi #{i}")
61+
activity_futures[i] = future
62+
63+
future.done do
64+
activity_futures.delete(i)
65+
echos_completed += 1
66+
end
67+
end
68+
69+
workflow.wait_for do
70+
workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}")
71+
activity_futures.empty?
72+
end
73+
74+
{
75+
signal: signals_received.key?(expected_signal),
76+
timer: timer_beat_activity,
77+
activity: echos_completed == total_echos
78+
}
79+
end
80+
end

lib/temporal/testing/local_workflow_context.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,16 @@ def wait_for_all(*futures)
165165
return
166166
end
167167

168-
def wait_for(future)
169-
# Point of communication
170-
Fiber.yield while !future.finished?
168+
def wait_for(*futures, &unblock_condition)
169+
if futures.empty? && unblock_condition.nil?
170+
raise 'You must pass either a future or an unblock condition block to wait_for'
171+
end
172+
173+
while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call)
174+
Fiber.yield
175+
end
176+
177+
return
171178
end
172179

173180
def now

lib/temporal/workflow/context.rb

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,14 +215,54 @@ def wait_for_all(*futures)
215215
return
216216
end
217217

218-
def wait_for(future)
218+
# Block workflow progress until any future is finished or any unblock_condition
219+
# block evaluates to true.
220+
def wait_for(*futures, &unblock_condition)
221+
if futures.empty? && unblock_condition.nil?
222+
raise 'You must pass either a future or an unblock condition block to wait_for'
223+
end
224+
219225
fiber = Fiber.current
226+
should_yield = false
227+
blocked = true
228+
229+
if futures.any?
230+
if futures.any?(&:finished?)
231+
blocked = false
232+
else
233+
should_yield = true
234+
futures.each do |future|
235+
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
236+
if blocked && future.finished?
237+
# Because this block can run for any dispatch, ensure the fiber is only
238+
# resumed one time by checking if it's already been unblocked.
239+
blocked = false
240+
fiber.resume
241+
end
242+
end
243+
end
244+
end
245+
end
220246

221-
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
222-
fiber.resume if future.finished?
247+
if blocked && unblock_condition
248+
if unblock_condition.call
249+
blocked = false
250+
should_yield = false
251+
else
252+
should_yield = true
253+
254+
dispatcher.register_handler(Dispatcher::WILDCARD, Dispatcher::WILDCARD) do
255+
# Because this block can run for any dispatch, ensure the fiber is only
256+
# resumed one time by checking if it's already been unblocked.
257+
if blocked && unblock_condition.call
258+
blocked = false
259+
fiber.resume
260+
end
261+
end
262+
end
223263
end
224264

225-
Fiber.yield
265+
Fiber.yield if should_yield
226266

227267
return
228268
end

lib/temporal/workflow/dispatcher.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def dispatch(target, event_name, args = nil)
2323

2424
def handlers_for(target, event_name)
2525
handlers[target]
26+
.concat(handlers[WILDCARD])
2627
.select { |(name, _)| name == event_name || name == WILDCARD }
2728
.map(&:last)
2829
end

spec/unit/lib/temporal/testing/local_workflow_context_spec.rb

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,69 @@ def execute
120120
result = workflow_context.execute_activity!(TestActivity)
121121
expect(result).to eq('ok')
122122
end
123+
124+
it 'can heartbeat' do
125+
# Heartbeat doesn't do anything in local mode, but at least it can be called.
126+
workflow_context.execute_activity!(TestHeartbeatingActivity)
127+
end
123128
end
124129

125-
it 'can heartbeat' do
126-
# Heartbeat doesn't do anything in local mode, but at least it can be called.
127-
workflow_context.execute_activity!(TestHeartbeatingActivity)
130+
describe '#wait_for' do
131+
it 'await unblocks once condition changes' do
132+
can_continue = false
133+
exited = false
134+
fiber = Fiber.new do
135+
workflow_context.wait_for do
136+
can_continue
137+
end
138+
139+
exited = true
140+
end
141+
142+
fiber.resume # start running
143+
expect(exited).to eq(false)
144+
145+
can_continue = true # change condition
146+
fiber.resume # resume running after the Fiber.yield done in context.await
147+
expect(exited).to eq(true)
148+
end
149+
150+
it 'condition or future unblocks' do
151+
exited = false
152+
153+
future = workflow_context.execute_activity(TestAsyncActivity)
154+
155+
fiber = Fiber.new do
156+
workflow_context.wait_for(future) do
157+
false
158+
end
159+
160+
exited = true
161+
end
162+
163+
fiber.resume # start running
164+
expect(exited).to eq(false)
165+
166+
execution.complete_activity(async_token, 'async_ok')
167+
168+
fiber.resume # resume running after the Fiber.yield done in context.await
169+
expect(exited).to eq(true)
170+
end
171+
172+
it 'any future unblocks' do
173+
exited = false
174+
175+
async_future = workflow_context.execute_activity(TestAsyncActivity)
176+
future = workflow_context.execute_activity(TestActivity)
177+
future.wait
178+
179+
fiber = Fiber.new do
180+
workflow_context.wait_for(future, async_future)
181+
exited = true
182+
end
183+
184+
fiber.resume # start running
185+
expect(exited).to eq(true)
186+
end
128187
end
129188
end

spec/unit/lib/temporal/workflow/dispatcher_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,24 @@
6262

6363
expect(handler_5).to have_received(:call)
6464
end
65+
end
66+
67+
context 'with WILDCARD target handler' do
68+
let(:handler_6) { -> { 'sixth block' } }
69+
before do
70+
allow(handler_6).to receive(:call)
71+
72+
subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, &handler_6)
73+
end
6574

75+
it 'calls the handler' do
76+
subject.dispatch('target', 'completed')
77+
78+
# Target handlers still invoked
79+
expect(handler_1).to have_received(:call).ordered
80+
expect(handler_4).to have_received(:call).ordered
81+
expect(handler_6).to have_received(:call).ordered
82+
end
6683
end
6784
end
6885
end

0 commit comments

Comments
 (0)