File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -209,6 +209,21 @@ Worker.prototype.removePooler = function (pooler) {
209209 }
210210} ;
211211
212+ Worker . prototype . removeTask = function ( pooler ) {
213+ this . logger . debug ( 'removePooler' ) ;
214+
215+ const index = this . _poolers . indexOf ( pooler ) ;
216+ if ( index === - 1 ) {
217+ throw ( new Error ( `pooler ${ pooler } is not in the pooler list` ) ) ;
218+ }
219+
220+ this . _poolers . splice ( index , 1 ) ;
221+
222+ if ( this . _poolers . length === 0 ) {
223+ this . emit ( 'empty-poolers' ) ;
224+ }
225+ } ;
226+
212227// Worker.prototype.removePooler = function () {
213228// if(!this._poolerRemovalPromise){
214229// this._poolerRemovalPromise = Promise.resolve()
@@ -290,10 +305,16 @@ Worker.prototype.stop = function () {
290305
291306Worker . prototype . restart = function ( cb ) {
292307 const oldPoolConcurrency = this . poolConcurrency ;
293- return this . stop ( ) . then ( ( ) => {
308+
309+ const promise = this . stop ( ) . then ( ( ) => {
294310 this . poolConcurrency = oldPoolConcurrency ;
295- return this . start ( cb ) ;
311+ return this . start ( ) ;
296312 } ) ;
313+ if ( cb ) {
314+ promise . catch ( cb ) . then ( ( ) => cb ( ) ) ;
315+ } else {
316+ return promise ;
317+ }
297318} ;
298319
299320util . inherits ( Worker , EventEmitter ) ;
Original file line number Diff line number Diff line change @@ -131,6 +131,7 @@ test.serial('Step function Activity Worker with 200 parallel tasks and heartbeat
131131 return new Promise ( ( resolve , reject ) => {
132132 worker . once ( 'empty' , ( ) => {
133133 t . is ( count , totalTasks ) ;
134+ t . true ( countFull > 0 ) ;
134135 // T.is(Math.abs(countFull - (totalTasks-taskConcurrency))/totalTasks)
135136 const endDate = new Date ( ) ;
136137 worker . logger . info ( `Spent ${ ( endDate - startDate ) / 1000 } seconds` ) ;
Original file line number Diff line number Diff line change @@ -197,7 +197,9 @@ test.serial('Restart the worker', t => {
197197
198198 if ( countSuccess === 1 ) {
199199 const beforeRestartLength = worker . _poolers . length ;
200+ console . log ( 'restart' ) ;
200201 worker . restart ( ( ) => {
202+ console . log ( 'restarted' ) ;
201203 t . is ( worker . _poolers . length , beforeRestartLength ) ;
202204 stepFunction . startExecution ( params2 ) . promise ( ) ;
203205 } ) ;
You can’t perform that action at this time.
0 commit comments