Skip to content

Commit 6310510

Browse files
authored
samples: sample for receiving messages with exactly-once delivery enabled (#588)
* Add receive_messages_with_exactly_once_delivery_enabled sample with its own region tag * Address Tianzi and Mahesh's comments. * Add code for arg parsing / integrate sample with infra * Add sample test * Reformat and remove min lease extension period setting from sample * Address Tianzi's comments. * Fix import of subscriber exceptions.
1 parent dd95c22 commit 6310510

2 files changed

Lines changed: 98 additions & 0 deletions

File tree

packages/google-cloud-pubsub/samples/snippets/subscriber.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,61 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
580580
# [END pubsub_subscriber_blocking_shutdown]
581581

582582

583+
def receive_messages_with_exactly_once_delivery_enabled(
584+
project_id: str, subscription_id: str, timeout: Optional[float] = None
585+
) -> None:
586+
"""Receives messages from a pull subscription with exactly-once delivery enabled."""
587+
# [START pubsub_subscriber_exactly_once]
588+
from concurrent.futures import TimeoutError
589+
from google.cloud import pubsub_v1
590+
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions
591+
592+
# TODO(developer)
593+
# project_id = "your-project-id"
594+
# subscription_id = "your-subscription-id"
595+
# Number of seconds the subscriber should listen for messages
596+
# timeout = 5.0
597+
598+
subscriber = pubsub_v1.SubscriberClient()
599+
# The `subscription_path` method creates a fully qualified identifier
600+
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
601+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
602+
603+
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
604+
print(f"Received {message}.")
605+
606+
# Use `ack_with_response()` instead of `ack()` to get a future that tracks
607+
# the result of the acknowledge call. When exactly-once delivery is enabled
608+
# on the subscription, the message is guaranteed to not be delivered again
609+
# if the ack future succeeds.
610+
ack_future = message.ack_with_response()
611+
612+
try:
613+
# Block on result of acknowledge call.
614+
# When `timeout` is not set, result() will block indefinitely,
615+
# unless an exception is encountered first.
616+
ack_future.result(timeout=timeout)
617+
print(f"Ack for message {message.message_id} successful.")
618+
except sub_exceptions.AcknowledgeError as e:
619+
print(
620+
f"Ack for message {message.message_id} failed with error: {e.error_code}"
621+
)
622+
623+
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
624+
print(f"Listening for messages on {subscription_path}..\n")
625+
626+
# Wrap subscriber in a 'with' block to automatically call close() when done.
627+
with subscriber:
628+
try:
629+
# When `timeout` is not set, result() will block indefinitely,
630+
# unless an exception is encountered first.
631+
streaming_pull_future.result(timeout=timeout)
632+
except TimeoutError:
633+
streaming_pull_future.cancel() # Trigger the shutdown.
634+
streaming_pull_future.result() # Block until the shutdown is complete.
635+
# [END pubsub_subscriber_exactly_once]
636+
637+
583638
def synchronous_pull(project_id: str, subscription_id: str) -> None:
584639
"""Pulling messages synchronously."""
585640
# [START pubsub_subscriber_sync_pull]
@@ -881,6 +936,17 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
881936
"timeout", default=None, type=float, nargs="?"
882937
)
883938

939+
receive_messages_with_exactly_once_delivery_enabled_parser = subparsers.add_parser(
940+
"receive-messages-with-exactly-once-delivery-enabled",
941+
help=receive_messages_with_exactly_once_delivery_enabled.__doc__,
942+
)
943+
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
944+
"subscription_id"
945+
)
946+
receive_messages_with_exactly_once_delivery_enabled_parser.add_argument(
947+
"timeout", default=None, type=float, nargs="?"
948+
)
949+
884950
synchronous_pull_parser = subparsers.add_parser(
885951
"receive-synchronously", help=synchronous_pull.__doc__
886952
)
@@ -967,6 +1033,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
9671033
receive_messages_with_blocking_shutdown(
9681034
args.project_id, args.subscription_id, args.timeout
9691035
)
1036+
elif args.command == "receive-messages-with-exactly-once-delivery-enabled":
1037+
receive_messages_with_exactly_once_delivery_enabled(
1038+
args.project_id, args.subscription_id, args.timeout
1039+
)
9701040
elif args.command == "receive-synchronously":
9711041
synchronous_pull(args.project_id, args.subscription_id)
9721042
elif args.command == "receive-synchronously-with-lease":

packages/google-cloud-pubsub/samples/snippets/subscriber_test.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,34 @@ def eventually_consistent_test() -> None:
624624
eventually_consistent_test()
625625

626626

627+
def test_receive_messages_with_exactly_once_delivery_enabled(
628+
publisher_client: pubsub_v1.PublisherClient,
629+
topic: str,
630+
subscription_async: str,
631+
capsys: CaptureFixture[str],
632+
) -> None:
633+
634+
typed_backoff = cast(
635+
Callable[[C], C], backoff.on_exception(backoff.expo, Unknown, max_time=60),
636+
)
637+
638+
@typed_backoff
639+
def eventually_consistent_test() -> None:
640+
_publish_messages(publisher_client, topic)
641+
642+
subscriber.receive_messages_with_exactly_once_delivery_enabled(
643+
PROJECT_ID, SUBSCRIPTION_ASYNC, 10
644+
)
645+
646+
out, _ = capsys.readouterr()
647+
assert "Listening" in out
648+
assert subscription_async in out
649+
assert "Received" in out
650+
assert "Ack" in out
651+
652+
eventually_consistent_test()
653+
654+
627655
def test_listen_for_errors(
628656
publisher_client: pubsub_v1.PublisherClient,
629657
topic: str,

0 commit comments

Comments
 (0)