Skip to content

Commit 14a9135

Browse files
authored
Pub/Sub: Replacing AutoClosable with BackgrondResource (#5710)
1 parent 7bce752 commit 14a9135

1 file changed

Lines changed: 7 additions & 17 deletions

File tree

  • google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1

google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import com.google.api.gax.batching.FlowControlSettings;
2727
import com.google.api.gax.batching.FlowController;
2828
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
29+
import com.google.api.gax.core.BackgroundResource;
2930
import com.google.api.gax.core.CredentialsProvider;
3031
import com.google.api.gax.core.Distribution;
32+
import com.google.api.gax.core.ExecutorAsBackgroundResource;
3133
import com.google.api.gax.core.ExecutorProvider;
3234
import com.google.api.gax.core.InstantiatingExecutorProvider;
3335
import com.google.api.gax.rpc.HeaderProvider;
@@ -117,7 +119,7 @@ public class Subscriber extends AbstractApiService {
117119
private final MessageReceiver receiver;
118120
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
119121
private final ApiClock clock;
120-
private final List<AutoCloseable> closeables = new ArrayList<>();
122+
private final List<BackgroundResource> backgroundResources = new ArrayList<>();
121123

122124
private Subscriber(Builder builder) {
123125
receiver = builder.receiver;
@@ -143,13 +145,7 @@ private Subscriber(Builder builder) {
143145
alarmsExecutor = systemExecutorProvider.getExecutor();
144146

145147
if (systemExecutorProvider.shouldAutoClose()) {
146-
closeables.add(
147-
new AutoCloseable() {
148-
@Override
149-
public void close() {
150-
alarmsExecutor.shutdown();
151-
}
152-
});
148+
backgroundResources.add(new ExecutorAsBackgroundResource((alarmsExecutor)));
153149
}
154150

155151
TransportChannelProvider channelProvider = builder.channelProvider;
@@ -298,8 +294,8 @@ public void run() {
298294
try {
299295
// stop connection is no-op if connections haven't been started.
300296
stopAllStreamingConnections();
301-
for (AutoCloseable closeable : closeables) {
302-
closeable.close();
297+
for (BackgroundResource resource : backgroundResources) {
298+
resource.shutdown();
303299
}
304300
notifyStopped();
305301
} catch (Exception e) {
@@ -315,13 +311,7 @@ private void startStreamingConnections() {
315311
for (int i = 0; i < numPullers; i++) {
316312
final ScheduledExecutorService executor = executorProvider.getExecutor();
317313
if (executorProvider.shouldAutoClose()) {
318-
closeables.add(
319-
new AutoCloseable() {
320-
@Override
321-
public void close() {
322-
executor.shutdown();
323-
}
324-
});
314+
backgroundResources.add(new ExecutorAsBackgroundResource((executor)));
325315
}
326316

327317
streamingSubscriberConnections.add(

0 commit comments

Comments
 (0)