Skip to content

Commit 62b8958

Browse files
authored
Merge pull request #7 from piercus/develop
Develop
2 parents ac06bc8 + 6d5161b commit 62b8958

7 files changed

Lines changed: 41 additions & 36 deletions

File tree

lib/task.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Task.prototype.report = function () {
5454

5555
Task.prototype.succeed = function (res) {
5656
this.worker.succeed({
57+
input: this.input,
5758
output: res,
5859
taskToken: this.taskToken,
5960
workerName: this.workerName
@@ -64,6 +65,7 @@ Task.prototype.succeed = function (res) {
6465
Task.prototype.fail = function (err) {
6566
this.worker.fail({
6667
error: err,
68+
input: this.input,
6769
taskToken: this.taskToken,
6870
workerName: this.workerName
6971
});
@@ -72,6 +74,7 @@ Task.prototype.fail = function (err) {
7274

7375
Task.prototype.heartbeat = function () {
7476
this.worker.heartbeat({
77+
input: this.input,
7578
taskToken: this.taskToken,
7679
workerName: this.workerName
7780
});

lib/worker.js

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const EventEmitter = require('events').EventEmitter;
22
const util = require('util');
33
const AWS = require('aws-sdk');
4+
const parser = require('aws-arn-parser');
45

56
const Pooler = require('./pooler.js');
67
const replaceError = require('./replace-error.js');
@@ -28,8 +29,7 @@ function Worker(options) {
2829
this.autoStart = typeof (options.autoStart) === 'boolean' ? options.autoStart : true;
2930

3031
if (!options.activityArn) {
31-
this.emit('error', new Error('activityArn is mandatory inside Worker'));
32-
return;
32+
throw (new Error('activityArn is mandatory inside Worker'));
3333
}
3434

3535
this.concurrency = typeof (options.concurrency) === 'number' ? options.concurrency : 1;
@@ -49,6 +49,12 @@ function Worker(options) {
4949
throw (new TypeError('worker does not define any function'));
5050
}
5151

52+
const {region} = parser(options.activityArn);
53+
54+
if (typeof (region) === 'string' && (this.stepfunction.config.region !== region)) {
55+
throw (new Error(`activity ARN region (${region}) should match with AWS Region (${this.stepfunction.config.region})`));
56+
}
57+
5258
if (this.autoStart) {
5359
setImmediate(() => {
5460
this.start(() => {
@@ -131,9 +137,10 @@ Worker.prototype.execute = function (input, cb, heartbeat) {
131137
Worker.prototype.succeed = function (res) {
132138
const params = Object.assign({}, res, {output: JSON.stringify(res.output)});
133139
delete params.workerName;
140+
delete params.input;
134141
this.stepfunction.sendTaskSuccess(params, err => {
135142
if (err) {
136-
this.emit('error', err);
143+
this.emit('error', {err, input: res.input});
137144
} else {
138145
this.emit('success', res);
139146
}
@@ -150,10 +157,11 @@ Worker.prototype.fail = function (res) {
150157
}
151158
const params = Object.assign({}, res, {error});
152159
delete params.workerName;
160+
delete params.input;
153161
this.logger.debug('sendTaskFailure', res.error);
154162
this.stepfunction.sendTaskFailure(params, err => {
155163
if (err) {
156-
this.emit('error', err);
164+
this.emit('error', {err, input: res.input});
157165
} else {
158166
this.emit('failure', res);
159167
}
@@ -163,11 +171,12 @@ Worker.prototype.fail = function (res) {
163171
Worker.prototype.heartbeat = function (res) {
164172
const params = Object.assign({}, res);
165173
delete params.workerName;
174+
delete params.input;
166175
this.logger.debug('sendTaskHeartbeat');
167176

168177
this.stepfunction.sendTaskHeartbeat(params, err => {
169178
if (err) {
170-
this.emit('error', err);
179+
this.emit('error', {err, input: res.input});
171180
} else {
172181
this.emit('heartbeat', res);
173182
}

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@
3131
"homepage": "https://github.com/piercus/step-function-worker#readme",
3232
"devDependencies": {
3333
"ava": "^0.25.0",
34-
"bluebird": "^3.5.0",
3534
"semantic-release": "^15.1.7",
36-
"vows": "^0.8.1",
3735
"winston": "^2.4.1",
3836
"xo": "^0.18.2"
3937
},
4038
"dependencies": {
39+
"aws-arn-parser": "^1.0.0",
4140
"aws-sdk": "^2.82.0"
4241
}
4342
}

test/scenarios/failed-fn.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
const test = require('ava').test;
22
const AWS = require('aws-sdk');
3-
const PromiseBlue = require('bluebird');
43
const winston = require('winston');
54

65
const StepFunctionWorker = require('../../index.js');
76
const createActivity = require('../utils/create-activity');
87
const cleanUp = require('../utils/clean-up');
98

10-
const stepfunction = new AWS.StepFunctions();
11-
const stepFunctionPromises = PromiseBlue.promisifyAll(stepfunction);
9+
const stepFunction = new AWS.StepFunctions();
1210

1311
const logger = new winston.Logger({
1412
transports: [new winston.transports.Console({
@@ -83,7 +81,7 @@ test.serial('Step function Activity Worker with A failing worker', t => {
8381
});
8482

8583
worker.on('success', reject);
86-
stepFunctionPromises.startExecutionAsync(params);
84+
stepFunction.startExecution(params).promise();
8785
});
8886
});
8987

test/scenarios/test.js

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
const test = require('ava').test;
2-
const PromiseBlue = require('bluebird');
32
const AWS = require('aws-sdk');
43
const StepFunctionWorker = require('../../index.js');
54
const createActivity = require('../utils/create-activity');
65
const cleanUp = require('../utils/clean-up');
76

8-
const stepfunction = new AWS.StepFunctions();
9-
const stepFunctionPromises = PromiseBlue.promisifyAll(stepfunction);
7+
const stepFunction = new AWS.StepFunctions();
108
const workerName = 'test worker name';
119
const stateMachineName = 'test-state-machine-' + Math.floor(Math.random() * 1000);
1210
const activityName = 'test-step-function-worker-' + Math.floor(Math.random() * 1000);
@@ -89,10 +87,10 @@ test.serial('Step function Activity Worker with 2 consecutive tasks', t => {
8987
});
9088
});
9189

92-
stepFunctionPromises.startExecutionAsync(params);
90+
stepFunction.startExecution(params).promise();
9391
});
9492

95-
stepFunctionPromises.startExecutionAsync(params);
93+
stepFunction.startExecution(params).promise();
9694
});
9795
});
9896

@@ -163,9 +161,9 @@ test.serial('Step function with 3 concurrent worker', t => {
163161
worker.on('success', onSuccess);
164162
worker.on('task', onTask);
165163
worker.on('error', reject);
166-
stepFunctionPromises.startExecutionAsync(params1);
167-
stepFunctionPromises.startExecutionAsync(params2);
168-
stepFunctionPromises.startExecutionAsync(params3);
164+
stepFunction.startExecution(params1).promise();
165+
stepFunction.startExecution(params2).promise();
166+
stepFunction.startExecution(params3).promise();
169167
});
170168
});
171169

test/utils/clean-up.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
const PromiseBlue = require('bluebird');
21
const AWS = require('aws-sdk');
32

4-
const stepfunction = new AWS.StepFunctions();
5-
const stepFunctionPromises = PromiseBlue.promisifyAll(stepfunction);
3+
const stepFunction = new AWS.StepFunctions();
64

75
module.exports = function ({
86
activityArn = null,
@@ -11,16 +9,16 @@ module.exports = function ({
119
let p1;
1210
let p2;
1311
if (activityArn) {
14-
p1 = stepFunctionPromises.deleteActivityAsync({
12+
p1 = stepFunction.deleteActivity({
1513
activityArn
16-
});
14+
}).promise();
1715
} else {
1816
p1 = Promise.resolve();
1917
}
2018
if (stateMachineArn) {
21-
p2 = stepFunctionPromises.deleteStateMachineAsync({
19+
p2 = stepFunction.deleteStateMachine({
2220
stateMachineArn
23-
});
21+
}).promise();
2422
} else {
2523
p2 = Promise.resolve();
2624
}

test/utils/create-activity.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
const PromiseBlue = require('bluebird');
21
const AWS = require('aws-sdk');
32

4-
const stepfunction = new AWS.StepFunctions();
5-
const stepFunctionPromises = PromiseBlue.promisifyAll(stepfunction);
3+
const stepFunction = new AWS.StepFunctions();
64

75
const stateMachineDefinition = function (options) {
86
return {
@@ -26,20 +24,22 @@ if (!stateMachineRoleArn) {
2624
}
2725

2826
module.exports = function ({context = {}, activityName, workerName, stateMachineName}) {
29-
return stepFunctionPromises
30-
.createActivityAsync({
27+
return stepFunction
28+
.createActivity({
3129
name: activityName
32-
}).bind(context).then(data => {
30+
}).promise().then(data => {
3331
context.activityArn = data.activityArn;
3432
context.workerName = workerName;
35-
}).then(function () {
33+
}).then(() => {
3634
const params = {
37-
definition: JSON.stringify(stateMachineDefinition({activityArn: this.activityArn})), /* Required */
35+
definition: JSON.stringify(stateMachineDefinition({activityArn: context.activityArn})), /* Required */
3836
name: stateMachineName, /* Required */
3937
roleArn: stateMachineRoleArn /* Required */
4038
};
41-
return stepFunctionPromises.createStateMachineAsync(params);
39+
return stepFunction.createStateMachine(params).promise();
4240
}).then(data => {
4341
context.stateMachineArn = data.stateMachineArn;
44-
}).return(context);
42+
}).then(() => {
43+
return context;
44+
});
4545
};

0 commit comments

Comments
 (0)