Skip to content

Commit d481cb4

Browse files
cojencoparthea
andauthored
feat: handle interrupted downloads with decompressive transcoding (#346)
* feat: handle interrupted downloads with decompressive transcoding * add test * check content-encoding response header Co-authored-by: Anthonios Partheniou <partheniou@google.com>
1 parent 1b58f36 commit d481cb4

5 files changed

Lines changed: 176 additions & 0 deletions

File tree

packages/google-resumable-media/google/resumable_media/_helpers.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
RANGE_HEADER = "range"
3434
CONTENT_RANGE_HEADER = "content-range"
35+
CONTENT_ENCODING_HEADER = "content-encoding"
3536

3637
_SLOW_CRC32C_WARNING = (
3738
"Currently using crcmod in pure python form. This is a slow "
@@ -40,6 +41,8 @@
4041
)
4142
_GENERATION_HEADER = "x-goog-generation"
4243
_HASH_HEADER = "x-goog-hash"
44+
_STORED_CONTENT_ENCODING_HEADER = "x-goog-stored-content-encoding"
45+
4346
_MISSING_CHECKSUM = """\
4447
No {checksum_type} checksum was returned from the service while downloading {}
4548
(which happens for composite objects), so client-side content integrity
@@ -369,6 +372,23 @@ def add_query_parameters(media_url, query_params):
369372
return urlunsplit((scheme, netloc, path, query, frag))
370373

371374

375+
def _is_decompressive_transcoding(response, get_headers):
376+
"""Returns True if the object was served decompressed. This happens when the
377+
"x-goog-stored-content-encoding" header is "gzip" and "content-encoding" header
378+
is not "gzip". See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip
379+
Args:
380+
response (~requests.Response): The HTTP response object.
381+
get_headers (callable: response->dict): returns response headers.
382+
Returns:
383+
bool: Returns True if decompressive transcoding has occurred; otherwise, False.
384+
"""
385+
headers = get_headers(response)
386+
return (
387+
headers.get(_STORED_CONTENT_ENCODING_HEADER) == "gzip"
388+
and headers.get(CONTENT_ENCODING_HEADER) != "gzip"
389+
)
390+
391+
372392
class _DoNothingHash(object):
373393
"""Do-nothing hash object.
374394

packages/google-resumable-media/google/resumable_media/requests/download.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@
3636
{}
3737
"""
3838

39+
_STREAM_SEEK_ERROR = """\
40+
Incomplete download for:
41+
{}
42+
Error writing to stream while handling a gzip-compressed file download.
43+
Please restart the download.
44+
"""
45+
3946

4047
class Download(_request_helpers.RequestsMixin, _download.Download):
4148
"""Helper to manage downloading a resource from a Google API.
@@ -206,7 +213,18 @@ def retriable_request():
206213

207214
self._process_response(result)
208215

216+
# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
217+
# thus we reset the stream position to the start of the stream.
218+
# See: https://cloud.google.com/storage/docs/transcoding#range
209219
if self._stream is not None:
220+
if _helpers._is_decompressive_transcoding(result, self._get_headers):
221+
try:
222+
self._stream.seek(0)
223+
except Exception as exc:
224+
msg = _STREAM_SEEK_ERROR.format(url)
225+
raise Exception(msg) from exc
226+
self._bytes_downloaded = 0
227+
210228
self._write_to_stream(result)
211229

212230
return result
@@ -379,7 +397,18 @@ def retriable_request():
379397

380398
self._process_response(result)
381399

400+
# With decompressive transcoding, GCS serves back the whole file regardless of the range request,
401+
# thus we reset the stream position to the start of the stream.
402+
# See: https://cloud.google.com/storage/docs/transcoding#range
382403
if self._stream is not None:
404+
if _helpers._is_decompressive_transcoding(result, self._get_headers):
405+
try:
406+
self._stream.seek(0)
407+
except Exception as exc:
408+
msg = _STREAM_SEEK_ERROR.format(url)
409+
raise Exception(msg) from exc
410+
self._bytes_downloaded = 0
411+
383412
self._write_to_stream(result)
384413

385414
return result

packages/google-resumable-media/tests/system/requests/test_download.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,26 @@ def test_download_to_stream(self, add_files, authorized_transport):
294294
assert stream.getvalue() == actual_contents
295295
check_tombstoned(download, authorized_transport)
296296

297+
def test_download_gzip_w_stored_content_headers(
298+
self, add_files, authorized_transport
299+
):
300+
# Retrieve the gzip compressed file
301+
info = ALL_FILES[-1]
302+
actual_contents = self._get_contents(info)
303+
blob_name = get_blob_name(info)
304+
305+
# Create the actual download object.
306+
media_url = utils.DOWNLOAD_URL_TEMPLATE.format(blob_name=blob_name)
307+
stream = io.BytesIO()
308+
download = self._make_one(media_url, stream=stream)
309+
# Consume the resource.
310+
response = download.consume(authorized_transport)
311+
assert response.status_code == http.client.OK
312+
assert response.headers.get(_helpers._STORED_CONTENT_ENCODING_HEADER) == "gzip"
313+
assert response.headers.get("X-Goog-Stored-Content-Length") is not None
314+
assert stream.getvalue() == actual_contents
315+
check_tombstoned(download, authorized_transport)
316+
297317
def test_extra_headers(self, authorized_transport, secret_file):
298318
blob_name, data, headers = secret_file
299319
# Create the actual download object.

packages/google-resumable-media/tests/unit/requests/test_download.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,45 @@ def test_consume_w_bytes_downloaded(self):
395395
range_bytes = "bytes={:d}-{:d}".format(offset, end)
396396
assert download._headers["range"] == range_bytes
397397

398+
def test_consume_gzip_reset_stream_w_bytes_downloaded(self):
399+
stream = io.BytesIO()
400+
chunks = (b"up down ", b"charlie ", b"brown")
401+
end = 65536
402+
403+
download = download_mod.Download(
404+
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
405+
)
406+
transport = mock.Mock(spec=["request"])
407+
408+
# Mock a decompressive transcoding retry operation with bytes already downloaded in the stream
409+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
410+
transport.request.return_value = _mock_response(chunks=chunks, headers=headers)
411+
offset = 16
412+
download._bytes_downloaded = offset
413+
download.consume(transport)
414+
415+
assert stream.getvalue() == b"".join(chunks)
416+
assert download._bytes_downloaded == len(b"".join(chunks))
417+
418+
def test_consume_gzip_reset_stream_error(self):
419+
stream = io.BytesIO()
420+
chunks = (b"up down ", b"charlie ", b"brown")
421+
end = 65536
422+
423+
download = download_mod.Download(
424+
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
425+
)
426+
transport = mock.Mock(spec=["request"])
427+
428+
# Mock a stream seek error while resuming a decompressive transcoding download
429+
stream.seek = mock.Mock(side_effect=OSError("mock stream seek error"))
430+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
431+
transport.request.return_value = _mock_response(chunks=chunks, headers=headers)
432+
offset = 16
433+
download._bytes_downloaded = offset
434+
with pytest.raises(Exception):
435+
download.consume(transport)
436+
398437

399438
class TestRawDownload(object):
400439
def test__write_to_stream_no_hash_check(self):
@@ -772,6 +811,49 @@ def test_consume_w_bytes_downloaded(self):
772811
range_bytes = "bytes={:d}-{:d}".format(offset, end)
773812
assert download._headers["range"] == range_bytes
774813

814+
def test_consume_gzip_reset_stream_w_bytes_downloaded(self):
815+
stream = io.BytesIO()
816+
chunks = (b"up down ", b"charlie ", b"brown")
817+
end = 65536
818+
819+
download = download_mod.RawDownload(
820+
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
821+
)
822+
transport = mock.Mock(spec=["request"])
823+
824+
# Mock a decompressive transcoding retry operation with bytes already downloaded in the stream
825+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
826+
transport.request.return_value = _mock_raw_response(
827+
chunks=chunks, headers=headers
828+
)
829+
offset = 16
830+
download._bytes_downloaded = offset
831+
download.consume(transport)
832+
833+
assert stream.getvalue() == b"".join(chunks)
834+
assert download._bytes_downloaded == len(b"".join(chunks))
835+
836+
def test_consume_gzip_reset_stream_error(self):
837+
stream = io.BytesIO()
838+
chunks = (b"up down ", b"charlie ", b"brown")
839+
end = 65536
840+
841+
download = download_mod.RawDownload(
842+
EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5"
843+
)
844+
transport = mock.Mock(spec=["request"])
845+
846+
# Mock a stream seek error while resuming a decompressive transcoding download
847+
stream.seek = mock.Mock(side_effect=OSError("mock stream seek error"))
848+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
849+
transport.request.return_value = _mock_raw_response(
850+
chunks=chunks, headers=headers
851+
)
852+
offset = 16
853+
download._bytes_downloaded = offset
854+
with pytest.raises(Exception):
855+
download.consume(transport)
856+
775857

776858
class TestChunkedDownload(object):
777859
@staticmethod

packages/google-resumable-media/tests/unit/test__helpers.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,31 @@ def test_header_value(self):
425425
assert generation_header == self.GENERATION_VALUE
426426

427427

428+
class Test__is_decompressive_transcoding(object):
429+
def test_empty_value(self):
430+
headers = {}
431+
response = _mock_response(headers=headers)
432+
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False
433+
434+
def test_gzip_in_headers(self):
435+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"}
436+
response = _mock_response(headers=headers)
437+
assert _helpers._is_decompressive_transcoding(response, _get_headers) is True
438+
439+
def test_gzip_not_in_headers(self):
440+
headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "identity"}
441+
response = _mock_response(headers=headers)
442+
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False
443+
444+
def test_gzip_w_content_encoding_in_headers(self):
445+
headers = {
446+
_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip",
447+
_helpers.CONTENT_ENCODING_HEADER: "gzip",
448+
}
449+
response = _mock_response(headers=headers)
450+
assert _helpers._is_decompressive_transcoding(response, _get_headers) is False
451+
452+
428453
class Test__get_generation_from_url(object):
429454

430455
GENERATION_VALUE = 1641590104888641

0 commit comments

Comments
 (0)