Skip to content

Commit c858941

Browse files
Add memos
1 parent 5648972 commit c858941

29 files changed

Lines changed: 207 additions & 43 deletions

examples/spec/integration/await_workflow_result_spec.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@
9595
run_id = Temporal.start_workflow(
9696
LoopWorkflow,
9797
2, # it continues as new if this arg is > 1
98-
{ options: { workflow_id: workflow_id } },
98+
options: {
99+
workflow_id: workflow_id,
100+
},
99101
)
100102

101103
expect do
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
require 'workflows/loop_workflow'
2+
3+
describe LoopWorkflow do
4+
it 'workflow continues as new into a new run' do
5+
workflow_id = SecureRandom.uuid
6+
memo = {
7+
'my-memo' => 'foo',
8+
}
9+
run_id = Temporal.start_workflow(
10+
LoopWorkflow,
11+
2, # it continues as new if this arg is > 1
12+
options: {
13+
workflow_id: workflow_id,
14+
memo: memo,
15+
},
16+
)
17+
18+
# First run will throw because it continued as new
19+
next_run_id = nil
20+
expect do
21+
Temporal.await_workflow_result(
22+
LoopWorkflow,
23+
workflow_id: workflow_id,
24+
run_id: run_id,
25+
)
26+
end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error|
27+
next_run_id = error.new_run_id
28+
end
29+
30+
expect(next_run_id).to_not eq(nil)
31+
32+
# Second run will not throw because it returns rather than continues as new.
33+
final_result = Temporal.await_workflow_result(
34+
LoopWorkflow,
35+
workflow_id: workflow_id,
36+
run_id: next_run_id,
37+
)
38+
39+
expect(final_result[:count]).to eq(1)
40+
41+
# memo should be copied to the next run automatically
42+
expect(final_result[:memo]).to eq(memo)
43+
end
44+
end

examples/spec/integration/metadata_workflow_spec.rb

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
workflow_id = 'task-queue-' + SecureRandom.uuid
88
run_id = Temporal.start_workflow(
99
subject,
10-
{ options: { workflow_id: workflow_id } },
10+
options: { workflow_id: workflow_id }
1111
)
12+
1213
actual_result = Temporal.await_workflow_result(
1314
subject,
1415
workflow_id: workflow_id,
1516
run_id: run_id,
1617
)
18+
1719
expect(actual_result.task_queue).to eq(Temporal.configuration.task_queue)
1820
end
1921

@@ -51,4 +53,39 @@
5153
)
5254
expect(Time.now - actual_result.run_started_at).to be_between(0, 30)
5355
end
56+
57+
it 'gets memo from workflow execution info' do
58+
workflow_id = 'memo_execution_test_wf-' + SecureRandom.uuid
59+
run_id = Temporal.start_workflow(subject, options: { workflow_id: workflow_id, memo: { 'foo' => 'bar' } })
60+
61+
actual_result = Temporal.await_workflow_result(
62+
subject,
63+
workflow_id: workflow_id,
64+
run_id: run_id,
65+
)
66+
expect(actual_result.memo['foo']).to eq('bar')
67+
68+
expect(Temporal.fetch_workflow_execution_info(
69+
'ruby-samples', workflow_id, nil
70+
).memo).to eq({ 'foo' => 'bar' })
71+
end
72+
73+
it 'gets memo from workflow context with no memo' do
74+
workflow_id = 'memo_context_no_memo_test_wf-' + SecureRandom.uuid
75+
76+
run_id = Temporal.start_workflow(
77+
subject,
78+
options: { workflow_id: workflow_id }
79+
)
80+
81+
actual_result = Temporal.await_workflow_result(
82+
subject,
83+
workflow_id: workflow_id,
84+
run_id: run_id,
85+
)
86+
expect(actual_result.memo).to eq({})
87+
expect(Temporal.fetch_workflow_execution_info(
88+
'ruby-samples', workflow_id, nil
89+
).memo).to eq({})
90+
end
5491
end

examples/workflows/loop_workflow.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ def execute(count)
88
return workflow.continue_as_new(count - 1)
99
end
1010

11-
return count
11+
return {
12+
count: count,
13+
memo: workflow.metadata.memo,
14+
}
1215
end
1316
end

examples/workflows/metadata_workflow.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ class MetadataWorkflow < Temporal::Workflow
22
def execute
33
workflow.metadata
44
end
5-
end
5+
end

lib/temporal/client.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ def start_workflow(workflow, *input, options: {}, **args)
5757
run_timeout: compute_run_timeout(execution_options),
5858
task_timeout: execution_options.timeouts[:task],
5959
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
60-
headers: execution_options.headers
60+
headers: execution_options.headers,
61+
memo: execution_options.memo,
6162
)
6263
else
6364
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
@@ -73,6 +74,7 @@ def start_workflow(workflow, *input, options: {}, **args)
7374
task_timeout: execution_options.timeouts[:task],
7475
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
7576
headers: execution_options.headers,
77+
memo: execution_options.memo,
7678
signal_name: signal_name,
7779
signal_input: signal_input
7880
)
@@ -119,7 +121,8 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
119121
task_timeout: execution_options.timeouts[:task],
120122
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
121123
headers: execution_options.headers,
122-
cron_schedule: cron_schedule
124+
cron_schedule: cron_schedule,
125+
memo: execution_options.memo
123126
)
124127

125128
response.run_id

lib/temporal/concerns/payloads.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
2121
from_payloads(payloads)&.first
2222
end
2323

24+
def from_payload_map(payload_map)
25+
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
26+
end
27+
2428
def to_payloads(data)
2529
payload_converter.to_payloads(data)
2630
end
@@ -41,6 +45,10 @@ def to_signal_payloads(data)
4145
to_payloads([data])
4246
end
4347

48+
def to_payload_map(data)
49+
data.transform_values(&method(:to_payload))
50+
end
51+
4452
private
4553

4654
def payload_converter

lib/temporal/connection/grpc.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ def start_workflow_execution(
8181
task_timeout:,
8282
workflow_id_reuse_policy: nil,
8383
headers: nil,
84-
cron_schedule: nil
84+
cron_schedule: nil,
85+
memo: nil
8586
)
8687
request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
8788
identity: identity,
@@ -101,7 +102,10 @@ def start_workflow_execution(
101102
header: Temporal::Api::Common::V1::Header.new(
102103
fields: headers
103104
),
104-
cron_schedule: cron_schedule
105+
cron_schedule: cron_schedule,
106+
memo: Temporal::Api::Common::V1::Memo.new(
107+
fields: to_payload_map(memo || {})
108+
)
105109
)
106110

107111
if workflow_id_reuse_policy
@@ -305,7 +309,8 @@ def signal_with_start_workflow_execution(
305309
headers: nil,
306310
cron_schedule: nil,
307311
signal_name:,
308-
signal_input:
312+
signal_input:,
313+
memo: nil
309314
)
310315
request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
311316
identity: identity,
@@ -327,7 +332,10 @@ def signal_with_start_workflow_execution(
327332
),
328333
cron_schedule: cron_schedule,
329334
signal_name: signal_name,
330-
signal_input: to_signal_payloads(signal_input)
335+
signal_input: to_signal_payloads(signal_input),
336+
memo: Temporal::Api::Common::V1::Memo.new(
337+
fields: to_payload_map(memo || {})
338+
),
331339
)
332340

333341
if workflow_id_reuse_policy

lib/temporal/connection/serializer/continue_as_new.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ def to_proto
1919
workflow_run_timeout: object.timeouts[:execution],
2020
workflow_task_timeout: object.timeouts[:task],
2121
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
22-
header: serialize_headers(object.headers)
22+
header: serialize_headers(object.headers),
23+
memo: serialize_memo(object.memo)
2324
)
2425
)
2526
end
@@ -31,6 +32,12 @@ def serialize_headers(headers)
3132

3233
Temporal::Api::Common::V1::Header.new(fields: object.headers)
3334
end
35+
36+
def serialize_memo(memo)
37+
return unless memo
38+
39+
Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo))
40+
end
3441
end
3542
end
3643
end

lib/temporal/connection/serializer/start_child_workflow.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def to_proto
2222
workflow_run_timeout: object.timeouts[:run],
2323
workflow_task_timeout: object.timeouts[:task],
2424
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
25-
header: serialize_headers(object.headers)
25+
header: serialize_headers(object.headers),
26+
memo: serialize_memo(object.memo),
2627
)
2728
)
2829
end
@@ -34,6 +35,12 @@ def serialize_headers(headers)
3435

3536
Temporal::Api::Common::V1::Header.new(fields: object.headers)
3637
end
38+
39+
def serialize_memo(memo)
40+
return unless memo
41+
42+
Temporal::Api::Common::V1::Memo.new(fields: object.memo)
43+
end
3744
end
3845
end
3946
end

0 commit comments

Comments
 (0)