Skip to content

Commit 4e6fb21

Browse files
authored
Add back support for retries in storage uploads. (#3378)
1 parent 8c65df8 commit 4e6fb21

3 files changed

Lines changed: 87 additions & 30 deletions

File tree

packages/google-cloud-storage/google/cloud/storage/blob.py

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,13 @@
7171
'storageClass',
7272
)
7373
_NUM_RETRIES_MESSAGE = (
74-
'num_retries is no longer supported. When a transient error occurs, '
75-
'such as a 429 Too Many Requests or 500 Internal Server Error, upload '
76-
'requests will be automatically retried. Subsequent retries will be '
77-
'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until '
78-
'10 minutes of wait time have elapsed. At that point, there will be no '
79-
'more attempts to retry.')
74+
'`num_retries` has been deprecated and will be removed in a future '
75+
'release. The default behavior (when `num_retries` is not specified) when '
76+
'a transient error (e.g. 429 Too Many Requests or 500 Internal Server '
77+
'Error) occurs will be as follows: upload requests will be automatically '
78+
'retried. Subsequent retries will be sent after waiting 1, 2, 4, 8, etc. '
79+
'seconds (exponential backoff) until 10 minutes of wait time have '
80+
'elapsed. At that point, there will be no more attempts to retry.')
8081
_READ_LESS_THAN_SIZE = (
8182
'Size {:d} was specified but the file-like object only had '
8283
'{:d} bytes remaining.')
@@ -583,7 +584,8 @@ def _get_upload_arguments(self, content_type):
583584
content_type = self._get_content_type(content_type)
584585
return headers, object_metadata, content_type
585586

586-
def _do_multipart_upload(self, client, stream, content_type, size):
587+
def _do_multipart_upload(self, client, stream, content_type,
588+
size, num_retries):
587589
"""Perform a multipart upload.
588590
589591
Assumes ``chunk_size`` is :data:`None` on the current blob.
@@ -610,6 +612,10 @@ def _do_multipart_upload(self, client, stream, content_type, size):
610612
from ``stream``). If not provided, the upload will be
611613
concluded once ``stream`` is exhausted (or :data:`None`).
612614
615+
:type num_retries: int
616+
:param num_retries: Number of upload retries. (Deprecated: This
617+
argument will be removed in a future release.)
618+
613619
:rtype: :class:`~requests.Response`
614620
:returns: The "200 OK" response object returned after the multipart
615621
upload request.
@@ -631,13 +637,19 @@ def _do_multipart_upload(self, client, stream, content_type, size):
631637
upload_url = _MULTIPART_URL_TEMPLATE.format(
632638
bucket_path=self.bucket.path)
633639
upload = MultipartUpload(upload_url, headers=headers)
640+
641+
if num_retries is not None:
642+
upload._retry_strategy = resumable_media.RetryStrategy(
643+
max_retries=num_retries)
644+
634645
response = upload.transmit(
635646
transport, data, object_metadata, content_type)
636647

637648
return response
638649

639650
def _initiate_resumable_upload(self, client, stream, content_type,
640-
size, extra_headers=None, chunk_size=None):
651+
size, num_retries, extra_headers=None,
652+
chunk_size=None):
641653
"""Initiate a resumable upload.
642654
643655
The content type of the upload will be determined in order
@@ -662,6 +674,10 @@ def _initiate_resumable_upload(self, client, stream, content_type,
662674
from ``stream``). If not provided, the upload will be
663675
concluded once ``stream`` is exhausted (or :data:`None`).
664676
677+
:type num_retries: int
678+
:param num_retries: Number of upload retries. (Deprecated: This
679+
argument will be removed in a future release.)
680+
665681
:type extra_headers: dict
666682
:param extra_headers: (Optional) Extra headers to add to standard
667683
headers.
@@ -693,13 +709,19 @@ def _initiate_resumable_upload(self, client, stream, content_type,
693709
upload_url = _RESUMABLE_URL_TEMPLATE.format(
694710
bucket_path=self.bucket.path)
695711
upload = ResumableUpload(upload_url, chunk_size, headers=headers)
712+
713+
if num_retries is not None:
714+
upload._retry_strategy = resumable_media.RetryStrategy(
715+
max_retries=num_retries)
716+
696717
upload.initiate(
697718
transport, stream, object_metadata, content_type,
698719
total_bytes=size, stream_final=False)
699720

700721
return upload, transport
701722

702-
def _do_resumable_upload(self, client, stream, content_type, size):
723+
def _do_resumable_upload(self, client, stream, content_type,
724+
size, num_retries):
703725
"""Perform a resumable upload.
704726
705727
Assumes ``chunk_size`` is not :data:`None` on the current blob.
@@ -726,19 +748,23 @@ def _do_resumable_upload(self, client, stream, content_type, size):
726748
from ``stream``). If not provided, the upload will be
727749
concluded once ``stream`` is exhausted (or :data:`None`).
728750
751+
:type num_retries: int
752+
:param num_retries: Number of upload retries. (Deprecated: This
753+
argument will be removed in a future release.)
754+
729755
:rtype: :class:`~requests.Response`
730756
:returns: The "200 OK" response object returned after the final chunk
731757
is uploaded.
732758
"""
733759
upload, transport = self._initiate_resumable_upload(
734-
client, stream, content_type, size)
760+
client, stream, content_type, size, num_retries)
735761

736762
while not upload.finished:
737763
response = upload.transmit_next_chunk(transport)
738764

739765
return response
740766

741-
def _do_upload(self, client, stream, content_type, size):
767+
def _do_upload(self, client, stream, content_type, size, num_retries):
742768
"""Determine an upload strategy and then perform the upload.
743769
744770
If the current blob has a ``chunk_size`` set, then a resumable upload
@@ -767,17 +793,21 @@ def _do_upload(self, client, stream, content_type, size):
767793
from ``stream``). If not provided, the upload will be
768794
concluded once ``stream`` is exhausted (or :data:`None`).
769795
796+
:type num_retries: int
797+
:param num_retries: Number of upload retries. (Deprecated: This
798+
argument will be removed in a future release.)
799+
770800
:rtype: dict
771801
:returns: The parsed JSON from the "200 OK" response. This will be the
772802
**only** response in the multipart case and it will be the
773803
**final** response in the resumable case.
774804
"""
775805
if self.chunk_size is None:
776806
response = self._do_multipart_upload(
777-
client, stream, content_type, size)
807+
client, stream, content_type, size, num_retries)
778808
else:
779809
response = self._do_resumable_upload(
780-
client, stream, content_type, size)
810+
client, stream, content_type, size, num_retries)
781811

782812
return response.json()
783813

@@ -831,7 +861,8 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
831861
:param content_type: Optional type of content being uploaded.
832862
833863
:type num_retries: int
834-
:param num_retries: Number of upload retries. (Deprecated.)
864+
:param num_retries: Number of upload retries. (Deprecated: This
865+
argument will be removed in a future release.)
835866
836867
:type client: :class:`~google.cloud.storage.client.Client`
837868
:param client: (Optional) The client to use. If not passed, falls back
@@ -846,7 +877,7 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
846877
_maybe_rewind(file_obj, rewind=rewind)
847878
try:
848879
created_json = self._do_upload(
849-
client, file_obj, content_type, size)
880+
client, file_obj, content_type, size, num_retries)
850881
self._set_properties(created_json)
851882
except resumable_media.InvalidResponse as exc:
852883
_raise_from_invalid_response(exc)
@@ -1004,7 +1035,7 @@ def create_resumable_upload_session(
10041035
# to the `ResumableUpload` constructor. The chunk size only
10051036
# matters when **sending** bytes to an upload.
10061037
upload, _ = self._initiate_resumable_upload(
1007-
client, dummy_stream, content_type, size,
1038+
client, dummy_stream, content_type, size, None,
10081039
extra_headers=extra_headers,
10091040
chunk_size=self._CHUNK_SIZE_MULTIPLE)
10101041

packages/google-cloud-storage/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
REQUIREMENTS = [
5454
'google-cloud-core >= 0.24.1, < 0.25dev',
5555
'google-auth >= 1.0.0',
56-
'google-resumable-media >= 0.1.0',
56+
'google-resumable-media >= 0.1.1',
5757
'requests >= 2.0.0',
5858
]
5959

packages/google-cloud-storage/tests/unit/test_blob.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,8 @@ def _mock_transport(self, status_code, headers, content=b''):
762762
fake_transport.request.return_value = fake_response
763763
return fake_transport
764764

765-
def _do_multipart_success(self, mock_get_boundary, size=None):
765+
def _do_multipart_success(self, mock_get_boundary, size=None,
766+
num_retries=None):
766767
bucket = mock.Mock(path='/b/w00t', spec=[u'path'])
767768
blob = self._make_one(u'blob-name', bucket=bucket)
768769
self.assertIsNone(blob.chunk_size)
@@ -777,7 +778,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None):
777778
stream = io.BytesIO(data)
778779
content_type = u'application/xml'
779780
response = blob._do_multipart_upload(
780-
client, stream, content_type, size)
781+
client, stream, content_type, size, num_retries)
781782

782783
# Check the mocks and the returned value.
783784
self.assertIs(response, fake_transport.request.return_value)
@@ -817,6 +818,11 @@ def test__do_multipart_upload_no_size(self, mock_get_boundary):
817818
def test__do_multipart_upload_with_size(self, mock_get_boundary):
818819
self._do_multipart_success(mock_get_boundary, size=10)
819820

821+
@mock.patch(u'google.resumable_media._upload.get_boundary',
822+
return_value=b'==0==')
823+
def test__do_multipart_upload_with_retry(self, mock_get_boundary):
824+
self._do_multipart_success(mock_get_boundary, num_retries=8)
825+
820826
def test__do_multipart_upload_bad_size(self):
821827
blob = self._make_one(u'blob-name', bucket=None)
822828

@@ -826,15 +832,15 @@ def test__do_multipart_upload_bad_size(self):
826832
self.assertGreater(size, len(data))
827833

828834
with self.assertRaises(ValueError) as exc_info:
829-
blob._do_multipart_upload(None, stream, None, size)
835+
blob._do_multipart_upload(None, stream, None, size, None)
830836

831837
exc_contents = str(exc_info.exception)
832838
self.assertIn(
833839
'was specified but the file-like object only had', exc_contents)
834840
self.assertEqual(stream.tell(), len(data))
835841

836842
def _initiate_resumable_helper(self, size=None, extra_headers=None,
837-
chunk_size=None):
843+
chunk_size=None, num_retries=None):
838844
from google.resumable_media.requests import ResumableUpload
839845

840846
bucket = mock.Mock(path='/b/whammy', spec=[u'path'])
@@ -862,7 +868,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
862868
stream = io.BytesIO(data)
863869
content_type = u'text/plain'
864870
upload, transport = blob._initiate_resumable_upload(
865-
client, stream, content_type, size,
871+
client, stream, content_type, size, num_retries,
866872
extra_headers=extra_headers, chunk_size=chunk_size)
867873

868874
# Check the returned values.
@@ -890,6 +896,14 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
890896
self.assertEqual(upload._total_bytes, size)
891897
self.assertEqual(upload._content_type, content_type)
892898
self.assertEqual(upload.resumable_url, resumable_url)
899+
retry_strategy = upload._retry_strategy
900+
self.assertEqual(retry_strategy.max_sleep, 64.0)
901+
if num_retries is None:
902+
self.assertEqual(retry_strategy.max_cumulative_retry, 600.0)
903+
self.assertIsNone(retry_strategy.max_retries)
904+
else:
905+
self.assertIsNone(retry_strategy.max_cumulative_retry)
906+
self.assertEqual(retry_strategy.max_retries, num_retries)
893907
self.assertIs(transport, fake_transport)
894908
# Make sure we never read from the stream.
895909
self.assertEqual(stream.tell(), 0)
@@ -923,6 +937,9 @@ def test__initiate_resumable_upload_with_extra_headers(self):
923937
extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'}
924938
self._initiate_resumable_helper(extra_headers=extra_headers)
925939

940+
def test__initiate_resumable_upload_with_retry(self):
941+
self._initiate_resumable_helper(num_retries=11)
942+
926943
def _make_resumable_transport(self, headers1, headers2,
927944
headers3, total_bytes):
928945
from google import resumable_media
@@ -990,7 +1007,7 @@ def _do_resumable_upload_call2(blob, content_type, data,
9901007
return mock.call(
9911008
'PUT', resumable_url, data=payload, headers=expected_headers)
9921009

993-
def _do_resumable_helper(self, use_size=False):
1010+
def _do_resumable_helper(self, use_size=False, num_retries=None):
9941011
bucket = mock.Mock(path='/b/yesterday', spec=[u'path'])
9951012
blob = self._make_one(u'blob-name', bucket=bucket)
9961013
blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE
@@ -1017,7 +1034,7 @@ def _do_resumable_helper(self, use_size=False):
10171034
stream = io.BytesIO(data)
10181035
content_type = u'text/html'
10191036
response = blob._do_resumable_upload(
1020-
client, stream, content_type, size)
1037+
client, stream, content_type, size, num_retries)
10211038

10221039
# Check the returned values.
10231040
self.assertIs(response, responses[2])
@@ -1039,7 +1056,10 @@ def test__do_resumable_upload_no_size(self):
10391056
def test__do_resumable_upload_with_size(self):
10401057
self._do_resumable_helper(use_size=True)
10411058

1042-
def _do_upload_helper(self, chunk_size=None):
1059+
def test__do_resumable_upload_with_retry(self):
1060+
self._do_resumable_helper(num_retries=6)
1061+
1062+
def _do_upload_helper(self, chunk_size=None, num_retries=None):
10431063
blob = self._make_one(u'blob-name', bucket=None)
10441064

10451065
# Create a fake response.
@@ -1061,17 +1081,18 @@ def _do_upload_helper(self, chunk_size=None):
10611081
size = 12345654321
10621082

10631083
# Make the request and check the mocks.
1064-
created_json = blob._do_upload(client, stream, content_type, size)
1084+
created_json = blob._do_upload(
1085+
client, stream, content_type, size, num_retries)
10651086
self.assertIs(created_json, mock.sentinel.json)
10661087
response.json.assert_called_once_with()
10671088
if chunk_size is None:
10681089
blob._do_multipart_upload.assert_called_once_with(
1069-
client, stream, content_type, size)
1090+
client, stream, content_type, size, num_retries)
10701091
blob._do_resumable_upload.assert_not_called()
10711092
else:
10721093
blob._do_multipart_upload.assert_not_called()
10731094
blob._do_resumable_upload.assert_called_once_with(
1074-
client, stream, content_type, size)
1095+
client, stream, content_type, size, num_retries)
10751096

10761097
def test__do_upload_without_chunk_size(self):
10771098
self._do_upload_helper()
@@ -1080,6 +1101,9 @@ def test__do_upload_with_chunk_size(self):
10801101
chunk_size = 1024 * 1024 * 1024 # 1GB
10811102
self._do_upload_helper(chunk_size=chunk_size)
10821103

1104+
def test__do_upload_with_retry(self):
1105+
self._do_upload_helper(num_retries=20)
1106+
10831107
def _upload_from_file_helper(self, side_effect=None, **kwargs):
10841108
from google.cloud._helpers import UTC
10851109

@@ -1109,8 +1133,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs):
11091133
self.assertEqual(blob.updated, new_updated)
11101134

11111135
# Check the mock.
1136+
num_retries = kwargs.get('num_retries')
11121137
blob._do_upload.assert_called_once_with(
1113-
client, stream, content_type, len(data))
1138+
client, stream, content_type, len(data), num_retries)
11141139

11151140
return stream
11161141

@@ -1151,10 +1176,11 @@ def _do_upload_mock_call_helper(self, blob, client, content_type, size):
11511176
mock_call = blob._do_upload.mock_calls[0]
11521177
call_name, pos_args, kwargs = mock_call
11531178
self.assertEqual(call_name, '')
1154-
self.assertEqual(len(pos_args), 4)
1179+
self.assertEqual(len(pos_args), 5)
11551180
self.assertEqual(pos_args[0], client)
11561181
self.assertEqual(pos_args[2], content_type)
11571182
self.assertEqual(pos_args[3], size)
1183+
self.assertIsNone(pos_args[4]) # num_retries
11581184
self.assertEqual(kwargs, {})
11591185

11601186
return pos_args[1]

0 commit comments

Comments
 (0)