Skip to content

Commit 87dd418

Browse files
authored
pubsub: translate exception to ApiException (#2531)
Fixes #2512.
1 parent 23bb30e commit 87dd418

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.google.api.gax.core.ExecutorProvider;
2424
import com.google.api.gax.core.InstantiatingExecutorProvider;
2525
import com.google.api.gax.grpc.ChannelProvider;
26+
import com.google.api.gax.grpc.GrpcApiExceptionFactory;
2627
import com.google.api.gax.retrying.RetrySettings;
28+
import com.google.api.gax.rpc.ApiException;
2729
import com.google.auth.Credentials;
2830
import com.google.auth.oauth2.GoogleCredentials;
2931
import com.google.common.annotations.VisibleForTesting;
@@ -367,9 +369,12 @@ public void onFailure(Throwable t) {
367369
|| System.currentTimeMillis() + nextBackoffDelay
368370
> outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) {
369371
try {
372+
ApiException gaxException =
373+
GrpcApiExceptionFactory.createException(
374+
t, Status.fromThrowable(t).getCode(), false);
370375
for (OutstandingPublish outstandingPublish :
371376
outstandingBatch.outstandingPublishes) {
372-
outstandingPublish.publishResult.setException(t);
377+
outstandingPublish.publishResult.setException(gaxException);
373378
}
374379
} finally {
375380
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.api.core.ApiClock;
2121
import com.google.api.gax.batching.FlowController;
2222
import com.google.api.gax.core.Distribution;
23+
import com.google.api.gax.grpc.GrpcApiExceptionFactory;
24+
import com.google.api.gax.rpc.ApiException;
2325
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
2426
import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline;
2527
import com.google.common.annotations.VisibleForTesting;
@@ -230,8 +232,11 @@ public void onFailure(Throwable cause) {
230232
return;
231233
}
232234
if (!StatusUtil.isRetryable(cause)) {
233-
logger.log(Level.SEVERE, "terminated streaming with exception", cause);
234-
notifyFailed(cause);
235+
ApiException gaxException =
236+
GrpcApiExceptionFactory.createException(
237+
cause, Status.fromThrowable(cause).getCode(), false);
238+
logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
239+
notifyFailed(gaxException);
235240
return;
236241
}
237242
logger.log(Level.FINE, "stream closed with retryable exception; will reconnect", cause);

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.google.api.gax.core.FixedExecutorProvider;
2424
import com.google.api.gax.core.InstantiatingExecutorProvider;
2525
import com.google.api.gax.grpc.FixedChannelProvider;
26+
import com.google.api.gax.grpc.GrpcStatusCode;
27+
import com.google.api.gax.rpc.ApiException;
2628
import com.google.cloud.pubsub.v1.FakeSubscriberServiceImpl.ModifyAckDeadline;
2729
import com.google.cloud.pubsub.v1.Subscriber.Builder;
2830
import com.google.common.base.Function;
@@ -38,7 +40,6 @@
3840
import io.grpc.Server;
3941
import io.grpc.Status;
4042
import io.grpc.StatusException;
41-
import io.grpc.StatusRuntimeException;
4243
import io.grpc.inprocess.InProcessChannelBuilder;
4344
import io.grpc.inprocess.InProcessServerBuilder;
4445
import java.util.ArrayList;
@@ -517,9 +518,15 @@ public void testFailedChannel_fatalError_subscriberFails() throws Exception {
517518
} finally {
518519
// The subscriber must finish with an state error because its FAILED status.
519520
assertEquals(Subscriber.State.FAILED, subscriber.state());
520-
assertEquals(
521-
Status.INVALID_ARGUMENT,
522-
((StatusRuntimeException) subscriber.failureCause()).getStatus());
521+
522+
Throwable t = subscriber.failureCause();
523+
assertTrue(t instanceof ApiException);
524+
525+
ApiException ex = (ApiException) (t);
526+
assertTrue(ex.getStatusCode() instanceof GrpcStatusCode);
527+
528+
GrpcStatusCode grpcCode = (GrpcStatusCode) ex.getStatusCode();
529+
assertEquals(Status.Code.INVALID_ARGUMENT, grpcCode.getCode());
523530
}
524531
}
525532

0 commit comments

Comments
 (0)