@@ -127,7 +127,7 @@ function Subscription(pubsub, options) {
127127 this . autoAck = util . is ( options . autoAck , 'boolean' ) ? options . autoAck : false ;
128128 this . closed = false ;
129129 this . interval = util . is ( options . interval , 'number' ) ? options . interval : 10 ;
130- this . inProgress = 0 ;
130+ this . inProgressAckIds = { } ;
131131 this . maxInProgress =
132132 util . is ( options . maxInProgress , 'number' ) ? options . maxInProgress : Infinity ;
133133 this . messageListeners = 0 ;
@@ -243,7 +243,7 @@ Subscription.prototype.startPulling_ = function() {
243243 var maxResults ;
244244
245245 if ( this . maxInProgress < Infinity ) {
246- maxResults = this . maxInProgress - this . inProgress ;
246+ maxResults = this . maxInProgress - Object . keys ( this . inProgressAckIds ) . length ;
247247 }
248248
249249 this . pull ( {
@@ -284,16 +284,17 @@ Subscription.prototype.ack = function(ackIds, callback) {
284284 'At least one ID must be specified before it can be acknowledged.' ) ;
285285 }
286286
287+ ackIds = util . arrayize ( ackIds ) ;
288+
287289 var body = {
288- ackIds : util . arrayize ( ackIds )
290+ ackIds : ackIds
289291 } ;
290292
291293 callback = callback || util . noop ;
292294
293295 var path = this . name + ':acknowledge' ;
294296
295297 this . makeReq_ ( 'POST' , path , null , body , function ( ) {
296- self . inProgress -- ;
297298 self . refreshPausedStatus_ ( ) ;
298299 callback . apply ( self , arguments ) ;
299300 } ) ;
@@ -391,9 +392,10 @@ Subscription.prototype.pull = function(options, callback) {
391392 }
392393
393394 var messages = response . receivedMessages || [ ] ;
394- messages = messages . map ( Subscription . formatMessage_ ) ;
395+ messages = messages
396+ . map ( Subscription . formatMessage_ )
397+ . map ( self . decorateMessage_ ) ;
395398
396- self . inProgress += messages . length ;
397399 self . refreshPausedStatus_ ( ) ;
398400
399401 if ( self . autoAck && messages . length !== 0 ) {
@@ -441,9 +443,25 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
441443 this . makeReq_ ( 'POST' , path , null , body , callback ) ;
442444} ;
443445
446+ Subscription . prototype . decorateMessage_ = function ( message ) {
447+ var self = this ;
448+
449+ this . inProgressAckIds [ message . ackId ] = true ;
450+
451+ message . ack = self . ack . bind ( self , message . ackId ) ;
452+
453+ message . skip = function ( ) {
454+ delete self . inProgressAckIds [ message . ackId ] ;
455+ self . refreshPausedStatus_ ( ) ;
456+ } ;
457+
458+ return message ;
459+ } ;
460+
444461Subscription . prototype . refreshPausedStatus_ = function ( ) {
445462 var isCurrentlyPaused = this . paused ;
446- this . paused = this . inProgress >= this . maxInProgress ;
463+ var inProgress = Object . keys ( this . inProgressAckIds ) . length ;
464+ this . paused = inProgress >= this . maxInProgress ;
447465
448466 if ( isCurrentlyPaused && ! this . paused && this . messageListeners > 0 ) {
449467 this . startPulling_ ( ) ;
0 commit comments