Skip to content

Commit 3deda64

Browse files
jeffschoner-stripeGitHub Enterprise
authored andcommitted
Merge pull request coinbase#106 from stripe-private-oss-forks/jeffschoner/downstream
Downstream latest temporal-ruby changes
2 parents 1edc8b4 + ee81e0c commit 3deda64

31 files changed

Lines changed: 533 additions & 98 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class TerminateWorkflowActivity < Temporal::Activity
2+
def execute(namespace, workflow_id, run_id)
3+
Temporal.terminate_workflow(workflow_id, namespace: namespace, run_id: run_id)
4+
end
5+
end

examples/bin/worker

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ worker.register_workflow(BranchingWorkflow)
2525
worker.register_workflow(CallFailingActivityWorkflow)
2626
worker.register_workflow(CancellingTimerWorkflow)
2727
worker.register_workflow(CheckWorkflow)
28+
worker.register_workflow(ChildWorkflowTimeoutWorkflow)
29+
worker.register_workflow(ChildWorkflowTerminatedWorkflow)
2830
worker.register_workflow(FailingActivitiesWorkflow)
2931
worker.register_workflow(FailingWorkflow)
3032
worker.register_workflow(HelloWorldWorkflow)
@@ -42,6 +44,7 @@ worker.register_workflow(QuickTimeoutWorkflow)
4244
worker.register_workflow(RandomlyFailingWorkflow)
4345
worker.register_workflow(ReleaseWorkflow)
4446
worker.register_workflow(ResultWorkflow)
47+
worker.register_workflow(ScheduleChildWorkflow)
4548
worker.register_workflow(SendSignalToExternalWorkflow)
4649
worker.register_workflow(SerialHelloWorldWorkflow)
4750
worker.register_workflow(SideEffectWorkflow)
@@ -66,6 +69,7 @@ worker.register_activity(LongRunningActivity)
6669
worker.register_activity(ProcessFileActivity)
6770
worker.register_activity(RandomlyFailingActivity)
6871
worker.register_activity(RandomNumberActivity)
72+
worker.register_activity(TerminateWorkflowActivity)
6973
worker.register_activity(SleepActivity)
7074
worker.register_activity(UploadFileActivity)
7175
worker.register_activity(Trip::BookFlightActivity)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
require 'workflows/child_workflow_terminated_workflow.rb'
2+
3+
describe ChildWorkflowTerminatedWorkflow do
4+
subject { described_class }
5+
6+
it 'successfully can catch if a child workflow times out' do
7+
workflow_id = SecureRandom.uuid
8+
9+
Temporal.start_workflow(
10+
subject,
11+
options: { workflow_id: workflow_id }
12+
)
13+
14+
result = Temporal.await_workflow_result(
15+
subject,
16+
workflow_id: workflow_id
17+
)
18+
19+
expect(result[:child_workflow_terminated]).to eq(true)
20+
expect(result[:error]).to be_a(Temporal::ChildWorkflowTerminatedError)
21+
end
22+
end
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
require 'workflows/child_workflow_timeout_workflow.rb'
2+
3+
describe ChildWorkflowTimeoutWorkflow do
4+
subject { described_class }
5+
6+
it 'successfully can catch if a child workflow times out' do
7+
workflow_id = SecureRandom.uuid
8+
9+
Temporal.start_workflow(
10+
subject,
11+
options: { workflow_id: workflow_id }
12+
)
13+
14+
result = Temporal.await_workflow_result(
15+
subject,
16+
workflow_id: workflow_id
17+
)
18+
puts result
19+
expect(result[:child_workflow_failed]).to eq(true)
20+
expect(result[:error]).to be_a(Temporal::ChildWorkflowTimeoutError)
21+
end
22+
end
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
require 'workflows/schedule_child_workflow'
2+
require 'workflows/hello_world_workflow'
3+
4+
describe ScheduleChildWorkflow, :integration do
5+
let(:cron_schedule) { "*/6 * * * *" }
6+
7+
it 'schedules a child workflow with a given cron schedule' do
8+
child_workflow_id = 'schedule_child_test_wf-' + SecureRandom.uuid
9+
workflow_id, run_id = run_workflow(
10+
described_class,
11+
child_workflow_id,
12+
cron_schedule,
13+
options: {
14+
timeouts: { execution: 10 }
15+
}
16+
)
17+
18+
wait_for_workflow_completion(workflow_id, run_id)
19+
parent_history = fetch_history(workflow_id, run_id)
20+
21+
child_workflow_event = parent_history.history.events.detect do |event|
22+
event.event_type == :EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED
23+
end
24+
expect(
25+
child_workflow_event.start_child_workflow_execution_initiated_event_attributes.cron_schedule
26+
).to eq(cron_schedule)
27+
28+
# Expecting the child workflow to terminate as a result of the parent close policy
29+
expect do
30+
Temporal.await_workflow_result(
31+
HelloWorldWorkflow,
32+
workflow_id: child_workflow_id
33+
)
34+
end.to raise_error(Temporal::WorkflowTerminated)
35+
36+
end
37+
end
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
require 'workflows/simple_timer_workflow'
2+
require 'activities/terminate_workflow_activity'
3+
4+
class ChildWorkflowTerminatedWorkflow < Temporal::Workflow
5+
def execute
6+
# start a child workflow that executes for 60 seconds, then attempts to try and terminate that workflow
7+
result = SimpleTimerWorkflow.execute(60)
8+
child_workflow_execution = result.child_workflow_execution_future.get
9+
TerminateWorkflowActivity.execute!(
10+
'ruby-samples',
11+
child_workflow_execution.workflow_id,
12+
child_workflow_execution.run_id
13+
)
14+
15+
# check that the result is now 'failed'
16+
{
17+
child_workflow_terminated: result.failed?, # terminated is represented as failed? with the Terminated Error
18+
error: result.get
19+
}
20+
end
21+
end
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
require 'workflows/quick_timeout_workflow'
2+
3+
class ChildWorkflowTimeoutWorkflow < Temporal::Workflow
4+
def execute
5+
# workflow timesout before it can finish running, we should be able to detect that with .failed?
6+
result = QuickTimeoutWorkflow.execute
7+
8+
result.get # wait for the workflow to finish so we can detect if it failed or not
9+
10+
{
11+
child_workflow_failed: result.failed?,
12+
error: result.get
13+
}
14+
end
15+
end
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class ScheduleChildWorkflow < Temporal::Workflow
2+
def execute(child_workflow_id, cron_schedule)
3+
HelloWorldWorkflow.schedule(cron_schedule, options: { workflow_id: child_workflow_id })
4+
workflow.sleep(1)
5+
end
6+
end

lib/temporal/configuration.rb

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
require 'temporal/connection/converter/payload/nil'
44
require 'temporal/connection/converter/payload/bytes'
55
require 'temporal/connection/converter/payload/json'
6-
require 'temporal/connection/converter/payload/json_protobuf'
6+
require 'temporal/connection/converter/payload/proto_json'
77
require 'temporal/connection/converter/composite'
88

99
module Temporal
@@ -12,8 +12,7 @@ class Configuration
1212
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)
1313

1414
attr_reader :timeouts, :error_handlers
15-
attr_writer :converter
16-
attr_accessor :connection_type, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :max_page_size, :connection_options, :search_attributes
15+
attr_accessor :connection_type, :converter, :host, :port, :credentials, :identity, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :max_page_size, :connection_options, :search_attributes
1716

1817
# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
1918
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
@@ -40,8 +39,8 @@ class Configuration
4039
payload_converters: [
4140
Temporal::Connection::Converter::Payload::Nil.new,
4241
Temporal::Connection::Converter::Payload::Bytes.new,
43-
Temporal::Connection::Converter::Payload::JSON.new,
44-
Temporal::Connection::Converter::Payload::JSONProtobuf.new,
42+
Temporal::Connection::Converter::Payload::ProtoJSON.new,
43+
Temporal::Connection::Converter::Payload::JSON.new
4544
]
4645
).freeze
4746

@@ -77,10 +76,6 @@ def timeouts=(new_timeouts)
7776
@timeouts = DEFAULT_TIMEOUTS.merge(new_timeouts)
7877
end
7978

80-
def converter
81-
@converter
82-
end
83-
8479
def for_connection
8580
Connection.new(
8681
type: connection_type,
@@ -90,25 +85,27 @@ def for_connection
9085
identity: identity || default_identity,
9186
options: {
9287
max_page_size: max_page_size
93-
}.merge(connection_options),
88+
}.merge(connection_options)
9489
).freeze
9590
end
9691

97-
def default_identity
98-
hostname = `hostname`
99-
pid = Process.pid
100-
101-
"#{pid}@#{hostname}"
102-
end
103-
10492
def default_execution_options
10593
Execution.new(
10694
namespace: namespace,
10795
task_queue: task_list,
10896
timeouts: timeouts,
10997
headers: headers,
110-
search_attributes: search_attributes,
98+
search_attributes: search_attributes
11199
).freeze
112100
end
101+
102+
private
103+
104+
def default_identity
105+
hostname = `hostname`
106+
pid = Process.pid
107+
108+
"#{pid}@#{hostname}".freeze
109+
end
113110
end
114111
end

lib/temporal/connection/converter/payload/json_protobuf.rb

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)