Skip to content

Commit 36b45a6

Browse files
Add memos
1 parent a16f856 commit 36b45a6

23 files changed

Lines changed: 131 additions & 36 deletions

examples/spec/integration/metadata_workflow_spec.rb

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
workflow_id = 'task-queue-' + SecureRandom.uuid
88
run_id = Temporal.start_workflow(
99
subject,
10-
{ options: { workflow_id: workflow_id } },
10+
options: { workflow_id: workflow_id }
1111
)
12+
1213
actual_result = Temporal.await_workflow_result(
1314
subject,
1415
workflow_id: workflow_id,
1516
run_id: run_id,
1617
)
18+
1719
expect(actual_result.task_queue).to eq(Temporal.configuration.task_queue)
1820
end
1921

@@ -51,4 +53,39 @@
5153
)
5254
expect(Time.now - actual_result.run_started_at).to be_between(0, 30)
5355
end
56+
57+
it 'gets memo from workflow execution info' do
58+
workflow_id = 'memo_execution_test_wf-' + SecureRandom.uuid
59+
run_id = Temporal.start_workflow(subject, options: { workflow_id: workflow_id, memo: { 'foo' => 'bar' } })
60+
61+
actual_result = Temporal.await_workflow_result(
62+
subject,
63+
workflow_id: workflow_id,
64+
run_id: run_id,
65+
)
66+
expect(actual_result.memo['foo']).to eq('bar')
67+
68+
expect(Temporal.fetch_workflow_execution_info(
69+
'ruby-samples', workflow_id, nil
70+
).memo).to eq({ 'foo' => 'bar' })
71+
end
72+
73+
it 'gets memo from workflow context with no memo' do
74+
workflow_id = 'memo_context_no_memo_test_wf-' + SecureRandom.uuid
75+
76+
run_id = Temporal.start_workflow(
77+
subject,
78+
options: { workflow_id: workflow_id }
79+
)
80+
81+
actual_result = Temporal.await_workflow_result(
82+
subject,
83+
workflow_id: workflow_id,
84+
run_id: run_id,
85+
)
86+
expect(actual_result.memo).to eq({})
87+
expect(Temporal.fetch_workflow_execution_info(
88+
'ruby-samples', workflow_id, nil
89+
).memo).to eq({})
90+
end
5491
end

examples/workflows/metadata_workflow.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ class MetadataWorkflow < Temporal::Workflow
22
def execute
33
workflow.metadata
44
end
5-
end
5+
end

lib/temporal/client.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def start_workflow(workflow, *input, **args)
4646

4747
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
4848
workflow_id = options[:workflow_id] || SecureRandom.uuid
49+
memo = options[:memo] || {}
4950

5051
if signal_name.nil? && signal_input.nil?
5152
response = connection.start_workflow_execution(
@@ -59,7 +60,8 @@ def start_workflow(workflow, *input, **args)
5960
run_timeout: compute_run_timeout(execution_options),
6061
task_timeout: execution_options.timeouts[:task],
6162
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
62-
headers: execution_options.headers
63+
headers: execution_options.headers,
64+
memo: memo,
6365
)
6466
else
6567
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
@@ -75,6 +77,7 @@ def start_workflow(workflow, *input, **args)
7577
task_timeout: execution_options.timeouts[:task],
7678
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
7779
headers: execution_options.headers,
80+
memo: memo,
7881
signal_name: signal_name,
7982
signal_input: signal_input
8083
)
@@ -107,6 +110,7 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
107110

108111
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
109112
workflow_id = options[:workflow_id] || SecureRandom.uuid
113+
memo = options[:memo] || {}
110114

111115
response = connection.start_workflow_execution(
112116
namespace: execution_options.namespace,
@@ -122,7 +126,8 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
122126
task_timeout: execution_options.timeouts[:task],
123127
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
124128
headers: execution_options.headers,
125-
cron_schedule: cron_schedule
129+
cron_schedule: cron_schedule,
130+
memo: memo
126131
)
127132

128133
response.run_id

lib/temporal/concerns/payloads.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
2121
from_payloads(payloads)&.first
2222
end
2323

24+
def from_payload_map(payload_map)
25+
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
26+
end
27+
2428
def to_payloads(data)
2529
payload_converter.to_payloads(data)
2630
end
@@ -41,6 +45,10 @@ def to_signal_payloads(data)
4145
to_payloads([data])
4246
end
4347

48+
def to_payload_map(data)
49+
data.transform_values(&method(:to_payload))
50+
end
51+
4452
private
4553

4654
def payload_converter

lib/temporal/connection/grpc.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ def start_workflow_execution(
8181
task_timeout:,
8282
workflow_id_reuse_policy: nil,
8383
headers: nil,
84-
cron_schedule: nil
84+
cron_schedule: nil,
85+
memo: nil
8586
)
8687
request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
8788
identity: identity,
@@ -101,7 +102,10 @@ def start_workflow_execution(
101102
header: Temporal::Api::Common::V1::Header.new(
102103
fields: headers
103104
),
104-
cron_schedule: cron_schedule
105+
cron_schedule: cron_schedule,
106+
memo: Temporal::Api::Common::V1::Memo.new(
107+
fields: to_payload_map(memo)
108+
)
105109
)
106110

107111
if workflow_id_reuse_policy
@@ -305,7 +309,8 @@ def signal_with_start_workflow_execution(
305309
headers: nil,
306310
cron_schedule: nil,
307311
signal_name:,
308-
signal_input:
312+
signal_input:,
313+
memo: nil
309314
)
310315
request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
311316
identity: identity,
@@ -327,7 +332,10 @@ def signal_with_start_workflow_execution(
327332
),
328333
cron_schedule: cron_schedule,
329334
signal_name: signal_name,
330-
signal_input: to_signal_payloads(signal_input)
335+
signal_input: to_signal_payloads(signal_input),
336+
memo: Temporal::Api::Common::V1::Memo.new(
337+
fields: to_payload_map(memo || {})
338+
),
331339
)
332340

333341
if workflow_id_reuse_policy

lib/temporal/connection/serializer/continue_as_new.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ def to_proto
1919
workflow_run_timeout: object.timeouts[:execution],
2020
workflow_task_timeout: object.timeouts[:task],
2121
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
22-
header: serialize_headers(object.headers)
22+
header: serialize_headers(object.headers),
23+
memo: serialize_memo(object.memo)
2324
)
2425
)
2526
end
@@ -31,6 +32,12 @@ def serialize_headers(headers)
3132

3233
Temporal::Api::Common::V1::Header.new(fields: object.headers)
3334
end
35+
36+
def serialize_memo(memo)
37+
return unless memo
38+
39+
Temporal::Api::Common::V1::Memo.new(fields: object.memo)
40+
end
3441
end
3542
end
3643
end

lib/temporal/connection/serializer/start_child_workflow.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def to_proto
2222
workflow_run_timeout: object.timeouts[:run],
2323
workflow_task_timeout: object.timeouts[:task],
2424
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto,
25-
header: serialize_headers(object.headers)
25+
header: serialize_headers(object.headers),
26+
memo: serialize_memo(object.memo),
2627
)
2728
)
2829
end
@@ -34,6 +35,12 @@ def serialize_headers(headers)
3435

3536
Temporal::Api::Common::V1::Header.new(fields: object.headers)
3637
end
38+
39+
def serialize_memo(memo)
40+
return unless memo
41+
42+
Temporal::Api::Common::V1::Memo.new(fields: object.memo)
43+
end
3744
end
3845
end
3946
end

lib/temporal/metadata.rb

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def generate_activity_metadata(task, namespace)
2020
workflow_run_id: task.workflow_execution.run_id,
2121
workflow_id: task.workflow_execution.workflow_id,
2222
workflow_name: task.workflow_type.name,
23-
headers: headers(task.header&.fields),
23+
headers: from_payload_map(task.header&.fields || {}),
2424
heartbeat_details: from_details_payloads(task.heartbeat_details)
2525
)
2626
end
@@ -48,20 +48,11 @@ def generate_workflow_metadata(event, task_metadata)
4848
attempt: event.attributes.attempt,
4949
namespace: task_metadata.namespace,
5050
task_queue: event.attributes.task_queue.name,
51-
headers: headers(event.attributes.header&.fields),
51+
headers: from_payload_map(event.attributes.header&.fields || {}),
5252
run_started_at: event.timestamp,
53+
memo: from_payload_map(event.attributes.memo&.fields || {}),
5354
)
5455
end
55-
56-
private
57-
58-
def headers(fields)
59-
result = {}
60-
fields.each do |field, payload|
61-
result[field] = from_payload(payload)
62-
end
63-
result
64-
end
6556
end
6657
end
6758
end

lib/temporal/metadata/workflow.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
module Temporal
44
module Metadata
55
class Workflow < Base
6-
attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at
6+
attr_reader :namespace, :id, :name, :run_id, :attempt, :task_queue, :headers, :run_started_at, :memo
77

8-
def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:)
8+
def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:, run_started_at:, memo:)
99
@namespace = namespace
1010
@id = id
1111
@name = name
@@ -14,6 +14,7 @@ def initialize(namespace:, id:, name:, run_id:, attempt:, task_queue:, headers:,
1414
@task_queue = task_queue
1515
@headers = headers
1616
@run_started_at = run_started_at
17+
@memo = memo
1718

1819
freeze
1920
end
@@ -31,6 +32,7 @@ def to_h
3132
'attempt' => attempt,
3233
'task_queue' => task_queue,
3334
'run_started_at' => run_started_at.to_f,
35+
'memo' => memo,
3436
}
3537
end
3638
end

lib/temporal/testing/temporal_override.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def start_locally(workflow, schedule, *input, **args)
8181
reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed
8282
workflow_id = options[:workflow_id] || SecureRandom.uuid
8383
run_id = SecureRandom.uuid
84+
memo = options[:memo] || {}
8485

8586
if !allowed?(workflow_id, reuse_policy)
8687
raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(
@@ -101,6 +102,7 @@ def start_locally(workflow, schedule, *input, **args)
101102
attempt: 1,
102103
task_queue: execution_options.task_queue,
103104
run_started_at: Time.now,
105+
memo: memo,
104106
headers: execution_options.headers
105107
)
106108
context = Temporal::Testing::LocalWorkflowContext.new(

0 commit comments

Comments
 (0)