Skip to content

Commit d1f0b6f

Browse files
authored
Merge pull request #13 from piercus/restart-activity-worker
Restart activity worker
2 parents 7a6b399 + f24712a commit d1f0b6f

10 files changed

Lines changed: 1504 additions & 1092 deletions

File tree

lib/pooler.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const util = require('util');
2-
const EventEmitter = require('events').EventEmitter;
2+
const {EventEmitter} = require('events');
33
const Task = require('./task.js');
44

55
/**
@@ -34,8 +34,8 @@ Pooler.prototype.stop = function (cb) {
3434
this.removeAllListeners();
3535
cb();
3636
});
37-
// This would be better approach but it does not seem to work
38-
// this._request.abort();
37+
// This would be better approach but it does not seem to work
38+
// this._request.abort();
3939
} else {
4040
cb();
4141
}
@@ -85,9 +85,9 @@ Pooler.prototype.getActivityTask = function () {
8585
}, (err, data) => {
8686
this._request = null;
8787
if (err) {
88-
// Console.log(err);
88+
// Console.log(err);
8989
if (err.code === 'RequestAbortedError') {
90-
// In case of abort, close silently
90+
// In case of abort, close silently
9191
} else {
9292
this.emit('error', err);
9393
}

lib/task.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const EventEmitter = require('events').EventEmitter;
1+
const {EventEmitter} = require('events');
22
const util = require('util');
33

44
/**

lib/worker.js

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const EventEmitter = require('events').EventEmitter;
1+
const {EventEmitter} = require('events');
22
const util = require('util');
33
const AWS = require('aws-sdk');
44
const parser = require('aws-arn-parser');
@@ -69,7 +69,10 @@ function Worker(options) {
6969
* @param {function} callback
7070
*/
7171
Worker.prototype.start = function (cb) {
72-
this.updatePool(cb);
72+
this.updatePool(err => {
73+
this.logger.info('Worker started');
74+
cb(err);
75+
});
7376
};
7477
/**
7578
* Get a report of the actual situation of the worker
@@ -117,25 +120,48 @@ Worker.prototype.removePooler = function (cb) {
117120

118121
/**
119122
* Close the worker, this function might take 60 seconds to finish to do step function design
123+
* remove all the events attached to the worker
120124
* @param {function} callback
121125
*/
122126

123127
Worker.prototype.close = function (cb) {
124-
this.logger.info('Closing the worker ... this might take 60 seconds');
128+
this.stop(cb);
129+
this.removeAllListeners();
130+
};
131+
132+
/**
133+
* Stop the worker
134+
* But does not remove all the events attached to it
135+
* NB: worker.concurrency is set to 0
136+
* @param {function} callback
137+
*/
138+
139+
Worker.prototype.stop = function (cb) {
140+
this.logger.info('Stopping the worker ... this might take 60 seconds');
125141
this.concurrency = 0;
126142
this.updatePool(err => {
127-
this.logger.info('Worker closed');
143+
this.logger.info('Worker stopped');
128144
cb(err);
129145
});
130-
this.removeAllListeners();
146+
};
147+
148+
Worker.prototype.restart = function (cb) {
149+
const oldConcurrency = this.concurrency;
150+
this.stop(err => {
151+
if (err) {
152+
return cb(err);
153+
}
154+
this.concurrency = oldConcurrency;
155+
this.start(cb);
156+
});
131157
};
132158

133159
Worker.prototype.execute = function (input, cb, heartbeat) {
134160
setImmediate(() => {
135161
try {
136162
this.fn(input, cb, heartbeat);
137-
} catch (err) {
138-
cb(err);
163+
} catch (error) {
164+
cb(error);
139165
}
140166
});
141167
};

0 commit comments

Comments
 (0)