Skip to content

Commit 7b9807d

Browse files
dpcollins-googlesduskis
authored andcommitted
Remove global synchronization from MessageDispatcher. (#4620)
* Remove global synchronization from MessageDispatcher. Now that this uses a LinkedBlockingDeque for batches, this is no longer necessary. * Run code format.
1 parent 7971d0b commit 7b9807d

1 file changed

Lines changed: 20 additions & 26 deletions

File tree

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -365,38 +365,32 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
365365
}
366366

367367
messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
368-
synchronized (outstandingMessageBatches) {
369-
outstandingMessageBatches.add(outstandingBatch);
370-
}
368+
outstandingMessageBatches.add(outstandingBatch);
371369
processOutstandingBatches();
372370
}
373371

374372
private void processOutstandingBatches() {
375-
synchronized (outstandingMessageBatches) {
376-
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
377-
nextBatch != null;
378-
nextBatch = outstandingMessageBatches.poll()) {
379-
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
380-
nextMessage != null;
381-
nextMessage = nextBatch.messages.poll()) {
382-
try {
383-
// This is a non-blocking flow controller.
384-
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
385-
} catch (FlowController.MaxOutstandingElementCountReachedException
386-
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
387-
// Unwind previous changes in the batches outstanding.
388-
nextBatch.messages.addFirst(nextMessage);
389-
outstandingMessageBatches.addFirst(nextBatch);
390-
return;
391-
} catch (FlowControlException unexpectedException) {
392-
throw new IllegalStateException(
393-
"Flow control unexpected exception", unexpectedException);
394-
}
395-
processOutstandingMessage(
396-
nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
373+
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
374+
nextBatch != null;
375+
nextBatch = outstandingMessageBatches.poll()) {
376+
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
377+
nextMessage != null;
378+
nextMessage = nextBatch.messages.poll()) {
379+
try {
380+
// This is a non-blocking flow controller.
381+
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
382+
} catch (FlowController.MaxOutstandingElementCountReachedException
383+
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
384+
// Unwind previous changes in the batches outstanding.
385+
nextBatch.messages.addFirst(nextMessage);
386+
outstandingMessageBatches.addFirst(nextBatch);
387+
return;
388+
} catch (FlowControlException unexpectedException) {
389+
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
397390
}
398-
nextBatch.doneCallback.run();
391+
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
399392
}
393+
nextBatch.doneCallback.run();
400394
}
401395
}
402396

0 commit comments

Comments
 (0)