Skip to content

Commit 331e616

Browse files
authored
Merge pull request #3307 from geigerj/subscription-fields
Add new subscription fields
2 parents 7100b84 + 7c90041 commit 331e616

9 files changed

Lines changed: 226 additions & 35 deletions

File tree

core/google/cloud/_helpers.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -439,16 +439,9 @@ def _timedelta_to_duration_pb(timedelta_val):
439439
:rtype: :class:`google.protobuf.duration_pb2.Duration`
440440
:returns: A duration object equivalent to the time delta.
441441
"""
442-
seconds_decimal = timedelta_val.total_seconds()
443-
# Truncate the parts other than the integer.
444-
seconds = int(seconds_decimal)
445-
if seconds_decimal < 0:
446-
signed_micros = timedelta_val.microseconds - 10**6
447-
else:
448-
signed_micros = timedelta_val.microseconds
449-
# Convert nanoseconds to microseconds.
450-
nanos = 1000 * signed_micros
451-
return duration_pb2.Duration(seconds=seconds, nanos=nanos)
442+
duration_pb = duration_pb2.Duration()
443+
duration_pb.FromTimedelta(timedelta_val)
444+
return duration_pb
452445

453446

454447
def _duration_pb_to_timedelta(duration_pb):

pubsub/google/cloud/pubsub/_gax.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from google.cloud._helpers import _to_bytes
3232
from google.cloud._helpers import _pb_timestamp_to_rfc3339
33+
from google.cloud._helpers import _timedelta_to_duration_pb
3334
from google.cloud._helpers import make_secure_channel
3435
from google.cloud._http import DEFAULT_USER_AGENT
3536
from google.cloud.exceptions import Conflict
@@ -276,7 +277,9 @@ def list_subscriptions(self, project, page_size=0, page_token=None):
276277
return GAXIterator(self._client, page_iter, item_to_value)
277278

278279
def subscription_create(self, subscription_path, topic_path,
279-
ack_deadline=None, push_endpoint=None):
280+
ack_deadline=None, push_endpoint=None,
281+
retain_acked_messages=None,
282+
message_retention_duration=None):
280283
"""API call: create a subscription
281284
282285
See:
@@ -302,6 +305,18 @@ def subscription_create(self, subscription_path, topic_path,
302305
(Optional) URL to which messages will be pushed by the back-end.
303306
If not set, the application must pull messages.
304307
308+
:type retain_acked_messages: bool
309+
:param retain_acked_messages:
310+
(Optional) Whether to retain acked messages. If set, acked messages
311+
are retained in the subscription's backlog for a duration indicated
312+
by `message_retention_duration`.
313+
314+
:type message_retention_duration: :class:`datetime.timedelta`
315+
:param message_retention_duration:
316+
(Optional) Whether to retain acked messages. If set, acked messages
317+
are retained in the subscription's backlog for a duration indicated
318+
by `message_retention_duration`. If unset, defaults to 7 days.
319+
305320
:rtype: dict
306321
:returns: ``Subscription`` resource returned from the API.
307322
"""
@@ -310,13 +325,16 @@ def subscription_create(self, subscription_path, topic_path,
310325
else:
311326
push_config = None
312327

313-
if ack_deadline is None:
314-
ack_deadline = 0
328+
if message_retention_duration is not None:
329+
message_retention_duration = _timedelta_to_duration_pb(
330+
message_retention_duration)
315331

316332
try:
317333
sub_pb = self._gax_api.create_subscription(
318334
subscription_path, topic_path,
319-
push_config=push_config, ack_deadline_seconds=ack_deadline)
335+
push_config=push_config, ack_deadline_seconds=ack_deadline,
336+
retain_acked_messages=retain_acked_messages,
337+
message_retention_duration=message_retention_duration)
320338
except GaxError as exc:
321339
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
322340
raise Conflict(topic_path)

pubsub/google/cloud/pubsub/_http.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121

2222
from google.cloud import _http
23+
from google.cloud._helpers import _timedelta_to_duration_pb
2324
from google.cloud.environment_vars import PUBSUB_EMULATOR
2425
from google.cloud.iterator import HTTPIterator
2526

@@ -295,7 +296,9 @@ def list_subscriptions(self, project, page_size=None, page_token=None):
295296
extra_params=extra_params)
296297

297298
def subscription_create(self, subscription_path, topic_path,
298-
ack_deadline=None, push_endpoint=None):
299+
ack_deadline=None, push_endpoint=None,
300+
retain_acked_messages=None,
301+
message_retention_duration=None):
299302
"""API call: create a subscription
300303
301304
See:
@@ -321,6 +324,18 @@ def subscription_create(self, subscription_path, topic_path,
321324
(Optional) URL to which messages will be pushed by the back-end.
322325
If not set, the application must pull messages.
323326
327+
:type retain_acked_messages: bool
328+
:param retain_acked_messages:
329+
(Optional) Whether to retain acked messages. If set, acked messages
330+
are retained in the subscription's backlog for a duration indicated
331+
by `message_retention_duration`.
332+
333+
:type message_retention_duration: :class:`datetime.timedelta`
334+
:param message_retention_duration:
335+
(Optional) Whether to retain acked messages. If set, acked messages
336+
are retained in the subscription's backlog for a duration indicated
337+
by `message_retention_duration`. If unset, defaults to 7 days.
338+
324339
:rtype: dict
325340
:returns: ``Subscription`` resource returned from the API.
326341
"""
@@ -333,6 +348,16 @@ def subscription_create(self, subscription_path, topic_path,
333348
if push_endpoint is not None:
334349
resource['pushConfig'] = {'pushEndpoint': push_endpoint}
335350

351+
if retain_acked_messages is not None:
352+
resource['retainAckedMessages'] = retain_acked_messages
353+
354+
if message_retention_duration is not None:
355+
pb = _timedelta_to_duration_pb(message_retention_duration)
356+
resource['messageRetentionDuration'] = {
357+
'seconds': pb.seconds,
358+
'nanos': pb.nanos
359+
}
360+
336361
return self.api_request(method='PUT', path=path, data=resource)
337362

338363
def subscription_get(self, subscription_path):

pubsub/google/cloud/pubsub/subscription.py

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""Define API Subscriptions."""
1616

17+
import datetime
18+
1719
from google.cloud.exceptions import NotFound
1820
from google.cloud.pubsub._helpers import topic_name_from_path
1921
from google.cloud.pubsub.iam import Policy
@@ -43,6 +45,19 @@ class Subscription(object):
4345
(Optional) URL to which messages will be pushed by the back-end. If
4446
not set, the application must pull messages.
4547
48+
:type retain_acked_messages: bool
49+
:param retain_acked_messages:
50+
(Optional) Whether to retain acked messages. If set, acked messages
51+
are retained in the subscription's backlog for a duration indicated
52+
by `message_retention_duration`.
53+
54+
:type message_retention_duration: :class:`datetime.timedelta`
55+
:param message_retention_duration:
56+
(Optional) Whether to retain acked messages. If set, acked messages
57+
are retained in the subscription's backlog for a duration indicated
58+
by `message_retention_duration`. If unset, defaults to 7 days.
59+
60+
4661
:type client: :class:`~google.cloud.pubsub.client.Client`
4762
:param client:
4863
(Optional) The client to use. If not passed, falls back to the
@@ -57,6 +72,7 @@ class Subscription(object):
5772
"""
5873

5974
def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
75+
retain_acked_messages=None, message_retention_duration=None,
6076
client=None):
6177

6278
if client is None and topic is None:
@@ -71,6 +87,8 @@ def __init__(self, name, topic=None, ack_deadline=None, push_endpoint=None,
7187
self._project = self._client.project
7288
self.ack_deadline = ack_deadline
7389
self.push_endpoint = push_endpoint
90+
self.retain_acked_messages = retain_acked_messages
91+
self.message_retention_duration = message_retention_duration
7492

7593
@classmethod
7694
def from_api_repr(cls, resource, client, topics=None):
@@ -107,10 +125,21 @@ def from_api_repr(cls, resource, client, topics=None):
107125
ack_deadline = resource.get('ackDeadlineSeconds')
108126
push_config = resource.get('pushConfig', {})
109127
push_endpoint = push_config.get('pushEndpoint')
128+
retain_acked_messages = resource.get('retainAckedMessages')
129+
resource_duration = resource.get('duration', {})
130+
message_retention_duration = datetime.timedelta(
131+
seconds=resource_duration.get('seconds', 0),
132+
microseconds=resource_duration.get('nanos', 0) / 1000)
110133
if topic is None:
111134
return cls(name, ack_deadline=ack_deadline,
112-
push_endpoint=push_endpoint, client=client)
113-
return cls(name, topic, ack_deadline, push_endpoint)
135+
push_endpoint=push_endpoint,
136+
retain_acked_messages=retain_acked_messages,
137+
message_retention_duration=message_retention_duration,
138+
client=client)
139+
return cls(name, topic=topic, ack_deadline=ack_deadline,
140+
push_endpoint=push_endpoint,
141+
retain_acked_messages=retain_acked_messages,
142+
message_retention_duration=message_retention_duration)
114143

115144
@property
116145
def project(self):
@@ -182,8 +211,10 @@ def create(self, client=None):
182211
client = self._require_client(client)
183212
api = client.subscriber_api
184213
api.subscription_create(
185-
self.full_name, self.topic.full_name, self.ack_deadline,
186-
self.push_endpoint)
214+
self.full_name, self.topic.full_name,
215+
ack_deadline=self.ack_deadline, push_endpoint=self.push_endpoint,
216+
retain_acked_messages=self.retain_acked_messages,
217+
message_retention_duration=self.message_retention_duration)
187218

188219
def exists(self, client=None):
189220
"""API call: test existence of the subscription via a GET request

pubsub/google/cloud/pubsub/topic.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ def __init__(self, name, client, timestamp_messages=False):
5252
self._client = client
5353
self.timestamp_messages = timestamp_messages
5454

55-
def subscription(self, name, ack_deadline=None, push_endpoint=None):
55+
def subscription(self, name, ack_deadline=None, push_endpoint=None,
56+
retain_acked_messages=None,
57+
message_retention_duration=None):
5658
"""Creates a subscription bound to the current topic.
5759
5860
Example: pull-mode subcription, default paramter values
@@ -85,11 +87,25 @@ def subscription(self, name, ack_deadline=None, push_endpoint=None):
8587
back-end. If not set, the application must pull
8688
messages.
8789
90+
:type retain_acked_messages: bool
91+
:param retain_acked_messages:
92+
(Optional) Whether to retain acked messages. If set, acked messages
93+
are retained in the subscription's backlog for a duration indicated
94+
by `message_retention_duration`.
95+
96+
:type message_retention_duration: :class:`datetime.timedelta`
97+
:param message_retention_duration:
98+
(Optional) Whether to retain acked messages. If set, acked messages
99+
are retained in the subscription's backlog for a duration indicated
100+
by `message_retention_duration`. If unset, defaults to 7 days.
101+
88102
:rtype: :class:`Subscription`
89103
:returns: The subscription created with the passed in arguments.
90104
"""
91-
return Subscription(name, self, ack_deadline=ack_deadline,
92-
push_endpoint=push_endpoint)
105+
return Subscription(
106+
name, self, ack_deadline=ack_deadline, push_endpoint=push_endpoint,
107+
retain_acked_messages=retain_acked_messages,
108+
message_retention_duration=message_retention_duration)
93109

94110
@classmethod
95111
def from_api_repr(cls, resource, client):

pubsub/tests/system.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import datetime
1516
import os
1617
import unittest
1718

@@ -155,6 +156,26 @@ def test_create_subscription_w_ack_deadline(self):
155156
self.assertEqual(subscription.ack_deadline, 120)
156157
self.assertIs(subscription.topic, topic)
157158

159+
def test_create_subscription_w_message_retention(self):
160+
TOPIC_NAME = 'create-sub-ack' + unique_resource_id('-')
161+
topic = Config.CLIENT.topic(TOPIC_NAME)
162+
self.assertFalse(topic.exists())
163+
topic.create()
164+
self.to_delete.append(topic)
165+
SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id()
166+
duration = datetime.timedelta(hours=12)
167+
subscription = topic.subscription(
168+
SUBSCRIPTION_NAME, retain_acked_messages=True,
169+
message_retention_duration=duration)
170+
self.assertFalse(subscription.exists())
171+
subscription.create()
172+
self.to_delete.append(subscription)
173+
self.assertTrue(subscription.exists())
174+
self.assertEqual(subscription.name, SUBSCRIPTION_NAME)
175+
self.assertTrue(subscription.retain_acked_messages)
176+
self.assertEqual(subscription.message_retention_duration, duration)
177+
self.assertIs(subscription.topic, topic)
178+
158179
def test_list_subscriptions(self):
159180
TOPIC_NAME = 'list-sub' + unique_resource_id('-')
160181
topic = Config.CLIENT.topic(TOPIC_NAME)
@@ -287,3 +308,6 @@ def test_subscription_iam_policy(self):
287308
policy.viewers = viewers
288309
new_policy = subscription.set_iam_policy(policy)
289310
self.assertEqual(new_policy.viewers, policy.viewers)
311+
312+
# TODO(geigerj): set retain_acked_messages=True in snapshot system test once
313+
# PR #3303 is merged

0 commit comments

Comments
 (0)