From 5665ce9e112f3c3ecdf53cbd55c2ac8d9951e32a Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Wed, 17 Feb 2021 10:44:56 -0800 Subject: [PATCH 01/24] Raise exception rather than swallowing it if there is something wrong in retry stream downloading --- sdk/core/azure-core/CHANGELOG.md | 3 ++ .../azure/core/pipeline/transport/_aiohttp.py | 17 +++++-- .../pipeline/transport/_requests_basic.py | 17 +++++-- .../test_stream_generator_async.py | 50 +++++++++++++++++++ .../azure-core/tests/test_stream_generator.py | 49 +++++++++++++++++- 5 files changed, 125 insertions(+), 11 deletions(-) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 90e3b12f5520..8e940fe0577d 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -2,6 +2,9 @@ ## 1.11.1 (Unreleased) +### Bug Fixes + +- Raise exception rather than swallowing it if there is something wrong in retry stream downloading #16723 ## 1.11.0 (2021-02-08) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 9468ac9d3756..7c83b49a9d6c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -228,17 +228,24 @@ async def __anext__(self): except _ResponseStopIteration: self.response.internal_response.close() raise StopAsyncIteration() - except (ChunkedEncodingError, ConnectionError): + except (ChunkedEncodingError, ConnectionError) as ex: retry_total -= 1 if retry_total <= 0: retry_active = False else: await asyncio.sleep(retry_interval) headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = await self.pipeline.run(self.request, stream=True, headers=headers) - if resp.http_response.status_code == 416: - raise - chunk = await self.response.internal_response.content.read(self.block_size) + try: + resp = await self.pipeline.run(self.request, stream=True, headers=headers) + if not resp.http_response: + raise + if resp.http_response.status_code == 416: + raise + chunk = await self.response.internal_response.content.read(self.block_size) + except Exception as err: # pylint: disable=broad-except + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise ex if not chunk: raise StopAsyncIteration() self.downloaded += len(chunk) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index a6d8582422bd..421eb5ce7078 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -130,17 +130,24 @@ def __next__(self): self.response.internal_response.close() raise StopIteration() except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): + requests.exceptions.ConnectionError) as ex: retry_total -= 1 if retry_total <= 0: retry_active = False else: time.sleep(retry_interval) headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.http_response.status_code == 416: - raise - chunk = next(self.iter_content_func) + try: + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if not resp.http_response: + raise + if resp.http_response.status_code == 416: + raise + chunk = next(self.iter_content_func) + except Exception as err: # pylint: disable=broad-except + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise ex if not chunk: raise StopIteration() self.downloaded += len(chunk) diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index f7368159615f..5cf48746058e 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -2,10 +2,13 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ +import requests from azure.core.pipeline.transport import ( HttpRequest, AsyncHttpResponse, AsyncHttpTransport, + AsyncioRequestsTransportResponse, + AioHttpTransport, ) from azure.core.pipeline import AsyncPipeline from azure.core.pipeline.transport._aiohttp import AioHttpStreamDownloadGenerator @@ -105,3 +108,50 @@ async def __call__(self, *args, **kwargs): with mock.patch('asyncio.sleep', new_callable=AsyncMock): with pytest.raises(ConnectionError): await stream.__anext__() + +@pytest.mark.asyncio +async def test_response_streaming_error_behavior(): + # Test to reproduce https://github.com/Azure/azure-sdk-for-python/issues/16723 + block_size = 103 + total_response_size = 500 + req_response = requests.Response() + req_request = requests.Request() + + class FakeStreamWithConnectionError: + # fake object for urllib3.response.HTTPResponse + + def stream(self, chunk_size, decode_content=False): + assert chunk_size == block_size + left = total_response_size + while left > 0: + if left <= block_size: + raise ConnectionError() + data = b"X" * min(chunk_size, left) + left -= len(data) + yield data + + def close(self): + pass + + req_response.raw = FakeStreamWithConnectionError() + + response = AsyncioRequestsTransportResponse( + req_request, + req_response, + block_size, + ) + + async def mock_run(self, *args, **kwargs): + return PipelineResponse( + None, + requests.Response(), + None, + ) + + transport = AioHttpTransport() + pipeline = AsyncPipeline(transport) + pipeline.run = mock_run + downloader = response.stream_download(pipeline) + with pytest.raises(ConnectionError): + while True: + await downloader.__anext__() diff --git a/sdk/core/azure-core/tests/test_stream_generator.py b/sdk/core/azure-core/tests/test_stream_generator.py index 4f40bba885eb..bb3c6a079af7 100644 --- a/sdk/core/azure-core/tests/test_stream_generator.py +++ b/sdk/core/azure-core/tests/test_stream_generator.py @@ -7,6 +7,8 @@ HttpRequest, HttpResponse, HttpTransport, + RequestsTransport, + RequestsTransportResponse, ) from azure.core.pipeline import Pipeline, PipelineResponse from azure.core.pipeline.transport._requests_basic import StreamDownloadGenerator @@ -98,4 +100,49 @@ def close(self): stream = StreamDownloadGenerator(pipeline, http_response) with mock.patch('time.sleep', return_value=None): with pytest.raises(requests.exceptions.ConnectionError): - stream.__next__() \ No newline at end of file + stream.__next__() + +def test_response_streaming_error_behavior(): + # Test to reproduce https://github.com/Azure/azure-sdk-for-python/issues/16723 + block_size = 103 + total_response_size = 500 + req_response = requests.Response() + req_request = requests.Request() + + class FakeStreamWithConnectionError: + # fake object for urllib3.response.HTTPResponse + + def stream(self, chunk_size, decode_content=False): + assert chunk_size == block_size + left = total_response_size + while left > 0: + if left <= block_size: + raise requests.exceptions.ConnectionError() + data = b"X" * min(chunk_size, left) + left -= len(data) + yield data + + def close(self): + pass + + req_response.raw = FakeStreamWithConnectionError() + + response = RequestsTransportResponse( + req_request, + req_response, + block_size, + ) + + def mock_run(self, *args, **kwargs): + return PipelineResponse( + None, + requests.Response(), + None, + ) + + transport = RequestsTransport() + pipeline = Pipeline(transport) + pipeline.run = mock_run + downloader = response.stream_download(pipeline) + with pytest.raises(requests.exceptions.ConnectionError): + full_response = b"".join(downloader) From 7ab97d5929771d3ca8f9b116d097166a5036d554 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 19 Feb 2021 09:35:14 -0800 Subject: [PATCH 02/24] updates --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 2 +- .../azure/core/pipeline/transport/_requests_basic.py | 1 + .../tests/async_tests/test_stream_generator_async.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 7c83b49a9d6c..f7843f72e2e7 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -241,7 +241,7 @@ async def __anext__(self): raise if resp.http_response.status_code == 416: raise - chunk = await self.response.internal_response.content.read(self.block_size) + chunk = await resp.http_response.content.read(self.block_size) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Unable to stream download: %s", err) self.response.internal_response.close() diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 421eb5ce7078..8d38818e37d7 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -143,6 +143,7 @@ def __next__(self): raise if resp.http_response.status_code == 416: raise + self.iter_content_func = resp.http_response.iter_content(self.block_size) chunk = next(self.iter_content_func) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Unable to stream download: %s", err) diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index 5cf48746058e..aca9bcd3ed25 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -10,7 +10,7 @@ AsyncioRequestsTransportResponse, AioHttpTransport, ) -from azure.core.pipeline import AsyncPipeline +from azure.core.pipeline import AsyncPipeline, PipelineResponse from azure.core.pipeline.transport._aiohttp import AioHttpStreamDownloadGenerator from unittest import mock import pytest @@ -93,7 +93,7 @@ def __init__(self): self.headers = {} self.content = MockContent() - async def close(self): + def close(self): pass class AsyncMock(mock.MagicMock): From 91936564ac7a711603d12f29bb0edb41429dd48e Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 19 Feb 2021 09:38:03 -0800 Subject: [PATCH 03/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index f7843f72e2e7..25b06e50850b 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -241,7 +241,8 @@ async def __anext__(self): raise if resp.http_response.status_code == 416: raise - chunk = await resp.http_response.content.read(self.block_size) + self.response = resp + chunk = await self.response.http_response.content.read(self.block_size) except Exception as err: # pylint: disable=broad-except _LOGGER.warning("Unable to stream download: %s", err) self.response.internal_response.close() From b40285cca82dc58abe3254faa467a116f1fd1b3f Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 19 Feb 2021 14:43:59 -0800 Subject: [PATCH 04/24] update --- .../azure/core/pipeline/transport/_aiohttp.py | 50 +++++++++-------- .../pipeline/transport/_requests_basic.py | 56 ++++++++++--------- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 25b06e50850b..496bcd063c96 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -218,46 +218,48 @@ async def __anext__(self): retry_active = True retry_total = 3 retry_interval = 1 # 1 second - while retry_active: - try: - chunk = await self.response.internal_response.content.read(self.block_size) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (ChunkedEncodingError, ConnectionError) as ex: + try: + chunk = await self.response.internal_response.content.read(self.block_size) + if not chunk: + raise _ResponseStopIteration() + self.downloaded += self.block_size + return chunk + except _ResponseStopIteration: + self.response.internal_response.close() + raise StopAsyncIteration() + except (ChunkedEncodingError, ConnectionError) as ex: + while retry_active: retry_total -= 1 if retry_total <= 0: retry_active = False else: await asyncio.sleep(retry_interval) + # todo handle pre-set range & x-ms-range headers = {'range': 'bytes=' + str(self.downloaded) + '-'} try: resp = await self.pipeline.run(self.request, stream=True, headers=headers) if not resp.http_response: - raise + continue if resp.http_response.status_code == 416: - raise + continue self.response = resp chunk = await self.response.http_response.content.read(self.block_size) - except Exception as err: # pylint: disable=broad-except - _LOGGER.warning("Unable to stream download: %s", err) + except StopIteration: self.response.internal_response.close() - raise ex + raise StopIteration() + except Exception: # pylint: disable=broad-except + continue if not chunk: + self.response.internal_response.close() raise StopAsyncIteration() - self.downloaded += len(chunk) + self.downloaded += self.block_size return chunk - continue - except StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + except StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise class AioHttpTransportResponse(AsyncHttpResponse): """Methods for accessing response body data. diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 8d38818e37d7..289975901e08 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -119,47 +119,51 @@ def __next__(self): retry_active = True retry_total = 3 retry_interval = 1 # 1 second - while retry_active: - try: - chunk = next(self.iter_content_func) - if not chunk: - raise StopIteration() - self.downloaded += self.block_size - return chunk - except StopIteration: - self.response.internal_response.close() + try: + chunk = next(self.iter_content_func) + if not chunk: raise StopIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError) as ex: + self.downloaded += self.block_size + return chunk + except StopIteration: + self.response.internal_response.close() + raise StopIteration() + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as ex: + while retry_active: retry_total -= 1 if retry_total <= 0: - retry_active = False + _LOGGER.warning("Unable to stream download: %s", err) + raise ex else: time.sleep(retry_interval) + # todo handle pre-set range & x-ms-range headers = {'range': 'bytes=' + str(self.downloaded) + '-'} try: resp = self.pipeline.run(self.request, stream=True, headers=headers) if not resp.http_response: - raise + continue if resp.http_response.status_code == 416: - raise + continue + self.response = resp self.iter_content_func = resp.http_response.iter_content(self.block_size) chunk = next(self.iter_content_func) - except Exception as err: # pylint: disable=broad-except - _LOGGER.warning("Unable to stream download: %s", err) + self.downloaded += self.block_size + return chunk + except StopIteration: self.response.internal_response.close() - raise ex + raise StopIteration() + except Exception: # pylint: disable=broad-except + continue if not chunk: + self.response.internal_response.close() raise StopIteration() - self.downloaded += len(chunk) - return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise next = __next__ # Python 2 compatibility. From a7ac8aa2af31503a402faece983c7b4571c9835e Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 19 Feb 2021 15:10:38 -0800 Subject: [PATCH 05/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 ++- .../azure/core/pipeline/transport/_requests_basic.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 496bcd063c96..24538e171261 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -231,7 +231,8 @@ async def __anext__(self): while retry_active: retry_total -= 1 if retry_total <= 0: - retry_active = False + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex else: await asyncio.sleep(retry_interval) # todo handle pre-set range & x-ms-range diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 289975901e08..6557748c95ae 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -133,7 +133,7 @@ def __next__(self): while retry_active: retry_total -= 1 if retry_total <= 0: - _LOGGER.warning("Unable to stream download: %s", err) + _LOGGER.warning("Unable to stream download: %s", ex) raise ex else: time.sleep(retry_interval) From 213edfb1c5a27fc3458224569ad9e7eff95e08d4 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 23 Feb 2021 15:31:44 -0800 Subject: [PATCH 06/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 ++- .../azure/core/pipeline/transport/_requests_basic.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 24538e171261..f21497330b60 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -236,7 +236,8 @@ async def __anext__(self): else: await asyncio.sleep(retry_interval) # todo handle pre-set range & x-ms-range - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} + headers = self.request.headers + headers.update({'range': 'bytes=' + str(self.downloaded) + '-'}) try: resp = await self.pipeline.run(self.request, stream=True, headers=headers) if not resp.http_response: diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 6557748c95ae..3d977e291deb 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -138,7 +138,8 @@ def __next__(self): else: time.sleep(retry_interval) # todo handle pre-set range & x-ms-range - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} + headers = self.request.headers + headers.update( {'range': 'bytes=' + str(self.downloaded) + '-'}) try: resp = self.pipeline.run(self.request, stream=True, headers=headers) if not resp.http_response: From 95a867ae37fcbb4d86f03a09df141bf67d464dd7 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 23 Feb 2021 17:39:41 -0800 Subject: [PATCH 07/24] updates --- .../pipeline/transport/_requests_basic.py | 25 +++++ .../azure-core/tests/test_range_header.py | 93 +++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 sdk/core/azure-core/tests/test_range_header.py diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 3d977e291deb..f438b0eea382 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -27,6 +27,7 @@ import logging from typing import Iterator, Optional, Any, Union, TypeVar import time +import six import urllib3 # type: ignore from urllib3.util.retry import Retry # type: ignore import requests @@ -49,6 +50,30 @@ _LOGGER = logging.getLogger(__name__) +def parse_range_header(header_value, header='Range'): + if not isinstance(header_value, six.string_types) or len(header_value) < len(header): + raise ValueError("Invalid header") + pos = header_value.index(":") + range_value = header_value[pos+1:].strip() + if not range_value.startswith("bytes="): + raise ValueError("Invalid header") + range = range_value[6:] + ret = range.split("-") + if len(ret) < 2: + raise ValueError("Invalid header") + start = int(ret[0]) if ret[0] else -1 + end = int(ret[1]) if ret[1] else -1 + return (start, end) + +def make_range_header(original_range, downloaded_size=0, header='Range'): + if original_range[0] == -1: + end = original_range[1] - downloaded_size + return header + ": bytes=-" + str(end) + start = original_range[0] + downloaded_size + if original_range[1] == -1: + return header + ": bytes=" + str(start) + "-" + return header + ": bytes=" + str(start) + "-" + str(original_range[1]) + class _RequestsTransportResponseBase(_HttpResponseBase): """Base class for accessing response data. diff --git a/sdk/core/azure-core/tests/test_range_header.py b/sdk/core/azure-core/tests/test_range_header.py new file mode 100644 index 000000000000..0f783d58aca0 --- /dev/null +++ b/sdk/core/azure-core/tests/test_range_header.py @@ -0,0 +1,93 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# -------------------------------------------------------------------------- + +from azure.core.pipeline.transport._requests_basic import parse_range_header, make_range_header + +def test_basic_range_header(): + range_header = "Range: bytes=0-500" + range = parse_range_header(range_header) + assert range == (0,500) + +def test_no_start_range_header(): + range_header = "Range: bytes=-500" + range = parse_range_header(range_header) + assert range == (-1,500) + +def test_no_end_range_header(): + range_header = "Range: bytes=0-" + range = parse_range_header(range_header) + assert range == (0,-1) + +def test_basic_range_ms_header(): + range_header = "x_ms_range: bytes=0-500" + range = parse_range_header(range_header) + assert range == (0,500) + +def test_no_start_range_ms_header(): + range_header = "x_ms_range: bytes=-500" + range = parse_range_header(range_header) + assert range == (-1,500) + +def test_no_end_range_ms_header(): + range_header = "x_ms_range: bytes=0-" + range = parse_range_header(range_header) + assert range == (0,-1) + +def test_make_basic_range_header(): + range_header = "Range: bytes=0-500" + range = parse_range_header(range_header) + header = make_range_header(range, 100) + assert header == "Range: bytes=100-500" + +def test_make_no_start_range_header(): + range_header = "Range: bytes=-500" + range = parse_range_header(range_header) + header = make_range_header(range, 100) + assert header == "Range: bytes=-400" + +def test_make_no_end_range_header(): + range_header = "Range: bytes=0-" + range = parse_range_header(range_header) + header = make_range_header(range, 100) + assert header == "Range: bytes=100-" + +def test_make_basic_range_ms_header(): + range_header = "x_ms_range: bytes=0-500" + range = parse_range_header(range_header) + header = make_range_header(range, 100, "x_ms_range") + assert header == "x_ms_range: bytes=100-500" + +def test_make_no_start_range_ms_header(): + range_header = "x_ms_range: bytes=-500" + range = parse_range_header(range_header) + header = make_range_header(range, 100, "x_ms_range") + assert header == "x_ms_range: bytes=-400" + +def test_make_no_end_range_ms_header(): + range_header = "x_ms_range: bytes=0-" + range = parse_range_header(range_header) + header = make_range_header(range, 100, "x_ms_range") + assert header == "x_ms_range: bytes=100-" From 00e5f77f9e7c863f145b6d4fadaec20f06328c98 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 23 Feb 2021 17:45:08 -0800 Subject: [PATCH 08/24] update --- .../pipeline/transport/_requests_basic.py | 15 +++--- .../azure-core/tests/test_range_header.py | 51 ++++--------------- 2 files changed, 15 insertions(+), 51 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index f438b0eea382..7d3ac7896659 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -50,11 +50,8 @@ _LOGGER = logging.getLogger(__name__) -def parse_range_header(header_value, header='Range'): - if not isinstance(header_value, six.string_types) or len(header_value) < len(header): - raise ValueError("Invalid header") - pos = header_value.index(":") - range_value = header_value[pos+1:].strip() +def parse_range_header(header_value): + range_value = header_value.strip() if not range_value.startswith("bytes="): raise ValueError("Invalid header") range = range_value[6:] @@ -65,14 +62,14 @@ def parse_range_header(header_value, header='Range'): end = int(ret[1]) if ret[1] else -1 return (start, end) -def make_range_header(original_range, downloaded_size=0, header='Range'): +def make_range_header(original_range, downloaded_size=0): if original_range[0] == -1: end = original_range[1] - downloaded_size - return header + ": bytes=-" + str(end) + return "bytes=-" + str(end) start = original_range[0] + downloaded_size if original_range[1] == -1: - return header + ": bytes=" + str(start) + "-" - return header + ": bytes=" + str(start) + "-" + str(original_range[1]) + return "bytes=" + str(start) + "-" + return "bytes=" + str(start) + "-" + str(original_range[1]) class _RequestsTransportResponseBase(_HttpResponseBase): diff --git a/sdk/core/azure-core/tests/test_range_header.py b/sdk/core/azure-core/tests/test_range_header.py index 0f783d58aca0..c156ad19607f 100644 --- a/sdk/core/azure-core/tests/test_range_header.py +++ b/sdk/core/azure-core/tests/test_range_header.py @@ -27,67 +27,34 @@ from azure.core.pipeline.transport._requests_basic import parse_range_header, make_range_header def test_basic_range_header(): - range_header = "Range: bytes=0-500" + range_header = "bytes=0-500" range = parse_range_header(range_header) assert range == (0,500) def test_no_start_range_header(): - range_header = "Range: bytes=-500" + range_header = "bytes=-500" range = parse_range_header(range_header) assert range == (-1,500) def test_no_end_range_header(): - range_header = "Range: bytes=0-" - range = parse_range_header(range_header) - assert range == (0,-1) - -def test_basic_range_ms_header(): - range_header = "x_ms_range: bytes=0-500" - range = parse_range_header(range_header) - assert range == (0,500) - -def test_no_start_range_ms_header(): - range_header = "x_ms_range: bytes=-500" - range = parse_range_header(range_header) - assert range == (-1,500) - -def test_no_end_range_ms_header(): - range_header = "x_ms_range: bytes=0-" + range_header = "bytes=0-" range = parse_range_header(range_header) assert range == (0,-1) def test_make_basic_range_header(): - range_header = "Range: bytes=0-500" + range_header = "bytes=0-500" range = parse_range_header(range_header) header = make_range_header(range, 100) - assert header == "Range: bytes=100-500" + assert header == "bytes=100-500" def test_make_no_start_range_header(): - range_header = "Range: bytes=-500" + range_header = "bytes=-500" range = parse_range_header(range_header) header = make_range_header(range, 100) - assert header == "Range: bytes=-400" + assert header == "bytes=-400" def test_make_no_end_range_header(): - range_header = "Range: bytes=0-" + range_header = "bytes=0-" range = parse_range_header(range_header) header = make_range_header(range, 100) - assert header == "Range: bytes=100-" - -def test_make_basic_range_ms_header(): - range_header = "x_ms_range: bytes=0-500" - range = parse_range_header(range_header) - header = make_range_header(range, 100, "x_ms_range") - assert header == "x_ms_range: bytes=100-500" - -def test_make_no_start_range_ms_header(): - range_header = "x_ms_range: bytes=-500" - range = parse_range_header(range_header) - header = make_range_header(range, 100, "x_ms_range") - assert header == "x_ms_range: bytes=-400" - -def test_make_no_end_range_ms_header(): - range_header = "x_ms_range: bytes=0-" - range = parse_range_header(range_header) - header = make_range_header(range, 100, "x_ms_range") - assert header == "x_ms_range: bytes=100-" + assert header == "bytes=100-" From 83d42190ee949253f2015f77edfd279a70964359 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 23 Feb 2021 17:57:30 -0800 Subject: [PATCH 09/24] update --- .../pipeline/transport/_requests_basic.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 7d3ac7896659..2b8e03e6ba2a 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -130,6 +130,15 @@ def __init__(self, pipeline, response): self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 + headers = response.request.headers + if "x-ms-range" in headers: + self.range_header = "x-ms-range" + self.range = headers["x-ms-range"] + elif "Range" in headers: + self.range_header = "Range" + self.range = headers["Range"] + if 'etag' in response.headers: + self.etag = response.headers['etag'] def __len__(self): return self.content_length @@ -161,13 +170,19 @@ def __next__(self): time.sleep(retry_interval) # todo handle pre-set range & x-ms-range headers = self.request.headers - headers.update( {'range': 'bytes=' + str(self.downloaded) + '-'}) + if not self.range_header: + range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} + else: + range_header = {self.range_header: make_range_header(self.range, self.downloaded)} + if self.etag: + range_header.update({'If-Match': self.etag}) + headers.update(range_header) try: resp = self.pipeline.run(self.request, stream=True, headers=headers) if not resp.http_response: continue - if resp.http_response.status_code == 416: - continue + if resp.http_response.status_code == 412: + raise ex self.response = resp self.iter_content_func = resp.http_response.iter_content(self.block_size) chunk = next(self.iter_content_func) From 0ede2c9b9bd9c4a287fdb4ac11327e36a6d32d16 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Tue, 23 Feb 2021 18:00:22 -0800 Subject: [PATCH 10/24] update --- .../azure/core/pipeline/transport/_requests_basic.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 2b8e03e6ba2a..d0279983b76f 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -137,8 +137,12 @@ def __init__(self, pipeline, response): elif "Range" in headers: self.range_header = "Range" self.range = headers["Range"] + else: + self.range_header = None if 'etag' in response.headers: self.etag = response.headers['etag'] + else: + self.etag = None def __len__(self): return self.content_length From aff402b270fcea57377d69854dbec941babe2960 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 09:40:10 -0800 Subject: [PATCH 11/24] update --- .../azure/core/pipeline/transport/_requests_basic.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index d0279983b76f..b820f8ce91c5 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -130,7 +130,7 @@ def __init__(self, pipeline, response): self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 - headers = response.request.headers + headers = response.headers if "x-ms-range" in headers: self.range_header = "x-ms-range" self.range = headers["x-ms-range"] @@ -139,7 +139,7 @@ def __init__(self, pipeline, response): self.range = headers["Range"] else: self.range_header = None - if 'etag' in response.headers: + if 'etag' in headers: self.etag = response.headers['etag'] else: self.etag = None @@ -171,15 +171,17 @@ def __next__(self): _LOGGER.warning("Unable to stream download: %s", ex) raise ex else: + if not self.etag: + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex time.sleep(retry_interval) # todo handle pre-set range & x-ms-range - headers = self.request.headers + headers = self.request.headers.copy() if not self.range_header: range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} else: range_header = {self.range_header: make_range_header(self.range, self.downloaded)} - if self.etag: - range_header.update({'If-Match': self.etag}) + range_header.update({'If-Match': self.etag}) headers.update(range_header) try: resp = self.pipeline.run(self.request, stream=True, headers=headers) From 35256ac0f366f0036d6a9d9a670893bf6bd0b615 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 13:44:04 -0800 Subject: [PATCH 12/24] updates --- .../azure/core/pipeline/transport/_aiohttp.py | 55 +++++++---- .../azure/core/pipeline/transport/_base.py | 23 +++++ .../pipeline/transport/_requests_asyncio.py | 92 +++++++++++------- .../pipeline/transport/_requests_basic.py | 84 ++++++---------- .../core/pipeline/transport/_requests_trio.py | 97 ++++++++++++------- 5 files changed, 208 insertions(+), 143 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index f21497330b60..56712f704c4c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -39,7 +39,7 @@ from azure.core.exceptions import ServiceRequestError, ServiceResponseError from azure.core.pipeline import Pipeline -from ._base import HttpRequest +from ._base import HttpRequest, parse_range_header, make_range_header from ._base_async import ( AsyncHttpTransport, AsyncHttpResponse, @@ -210,6 +210,16 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.block_size = response.block_size self.content_length = int(response.internal_response.headers.get('Content-Length', 0)) self.downloaded = 0 + headers = response.headers + if "x-ms-range" in headers: + self.range_header = "x-ms-range" + self.range = headers["x-ms-range"] + elif "Range" in headers: + self.range_header = "Range" + self.range = headers["Range"] + else: + self.range_header = None + self.etag = response.headers['etag'] if 'etag' in headers else None def __len__(self): return self.content_length @@ -225,7 +235,7 @@ async def __anext__(self): self.downloaded += self.block_size return chunk except _ResponseStopIteration: - self.response.internal_response.close() + await self.response.internal_response.close() raise StopAsyncIteration() except (ChunkedEncodingError, ConnectionError) as ex: while retry_active: @@ -233,34 +243,39 @@ async def __anext__(self): if retry_total <= 0: _LOGGER.warning("Unable to stream download: %s", ex) raise ex + if not self.etag: + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + await asyncio.sleep(retry_interval) + headers = self.request.headers.copy() + if not self.range_header: + range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} else: - await asyncio.sleep(retry_interval) - # todo handle pre-set range & x-ms-range - headers = self.request.headers - headers.update({'range': 'bytes=' + str(self.downloaded) + '-'}) - try: - resp = await self.pipeline.run(self.request, stream=True, headers=headers) - if not resp.http_response: - continue - if resp.http_response.status_code == 416: - continue - self.response = resp - chunk = await self.response.http_response.content.read(self.block_size) - except StopIteration: - self.response.internal_response.close() - raise StopIteration() - except Exception: # pylint: disable=broad-except + range_header = {self.range_header: make_range_header(self.range, self.downloaded)} + range_header.update({'If-Match': self.etag}) + headers.update(range_header) + try: + resp = await self.pipeline.run(self.request, stream=True, headers=headers) + if not resp.http_response: + continue + if resp.http_response.status_code == 412: continue + self.response = resp.http_response + chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: - self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk + except StopAsyncIteration: + await self.response.internal_response.close() + raise StopAsyncIteration() + except Exception: # pylint: disable=broad-except + continue except StreamConsumedError: raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() + await self.response.internal_response.close() raise class AioHttpTransportResponse(AsyncHttpResponse): diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_base.py b/sdk/core/azure-core/azure/core/pipeline/transport/_base.py index c77212d33e69..6ad405850735 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_base.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_base.py @@ -963,3 +963,26 @@ def options(self, url, params=None, headers=None, **kwargs): "OPTIONS", url, params, headers, content, form_content, None ) return request + + +def parse_range_header(header_value): + range_value = header_value.strip() + if not range_value.startswith("bytes="): + raise ValueError("Invalid header") + range = range_value[6:] + ret = range.split("-") + if len(ret) < 2: + raise ValueError("Invalid header") + start = int(ret[0]) if ret[0] else -1 + end = int(ret[1]) if ret[1] else -1 + return (start, end) + +def make_range_header(original_range, downloaded_size=0): + if original_range[0] == -1: + end = original_range[1] - downloaded_size + return "bytes=-" + str(end) + start = original_range[0] + downloaded_size + if original_range[1] == -1: + return "bytes=" + str(start) + "-" + return "bytes=" + str(start) + "-" + str(original_range[1]) + diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 087a373b0732..0eb45f591089 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -37,7 +37,7 @@ ServiceResponseError ) from azure.core.pipeline import Pipeline -from ._base import HttpRequest +from ._base import HttpRequest, parse_range_header, make_range_header from ._base_async import ( AsyncHttpResponse, _ResponseStopIteration, @@ -149,6 +149,16 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 + headers = response.headers + if "x-ms-range" in headers: + self.range_header = "x-ms-range" + self.range = headers["x-ms-range"] + elif "Range" in headers: + self.range_header = "Range" + self.range = headers["Range"] + else: + self.range_header = None + self.etag = response.headers['etag'] if 'etag' in headers else None def __len__(self): return self.content_length @@ -158,48 +168,64 @@ async def __anext__(self): retry_active = True retry_total = 3 retry_interval = 1 # 1 second - while retry_active: - try: - chunk = await loop.run_in_executor( - None, - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): + try: + chunk = await loop.run_in_executor( + None, + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + self.downloaded += self.block_size + return chunk + except _ResponseStopIteration: + await self.response.internal_response.close() + raise StopAsyncIteration() + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as ex: + while retry_active: retry_total -= 1 if retry_total <= 0: - retry_active = False + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + if not self.etag: + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + await asyncio.sleep(retry_interval) + headers = self.request.headers.copy() + if not self.range_header: + range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} else: - await asyncio.sleep(retry_interval) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.status_code == 416: - raise + range_header = {self.range_header: make_range_header(self.range, self.downloaded)} + range_header.update({'If-Match': self.etag}) + headers.update(range_header) + try: + resp = await self.pipeline.run(self.request, stream=True, headers=headers) + if not resp.http_response: + continue + if resp.http_response.status_code == 412: + continue + self.response = resp.http_response chunk = await loop.run_in_executor( None, _iterate_response_content, self.iter_content_func, ) if not chunk: - raise StopIteration() - self.downloaded += len(chunk) + raise StopAsyncIteration() + self.downloaded += self.block_size return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise - + except StopAsyncIteration: + await self.response.internal_response.close() + raise StopAsyncIteration() + except Exception: # pylint: disable=broad-except + continue + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + await self.response.internal_response.close() + raise class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore """Asynchronous streaming of data from the response. diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index b820f8ce91c5..5b710d309b7d 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -42,7 +42,9 @@ from ._base import ( HttpTransport, HttpResponse, - _HttpResponseBase + _HttpResponseBase, + make_range_header, + parse_range_header, ) from ._bigger_block_size_http_adapters import BiggerBlockSizeHTTPAdapter @@ -50,28 +52,6 @@ _LOGGER = logging.getLogger(__name__) -def parse_range_header(header_value): - range_value = header_value.strip() - if not range_value.startswith("bytes="): - raise ValueError("Invalid header") - range = range_value[6:] - ret = range.split("-") - if len(ret) < 2: - raise ValueError("Invalid header") - start = int(ret[0]) if ret[0] else -1 - end = int(ret[1]) if ret[1] else -1 - return (start, end) - -def make_range_header(original_range, downloaded_size=0): - if original_range[0] == -1: - end = original_range[1] - downloaded_size - return "bytes=-" + str(end) - start = original_range[0] + downloaded_size - if original_range[1] == -1: - return "bytes=" + str(start) + "-" - return "bytes=" + str(start) + "-" + str(original_range[1]) - - class _RequestsTransportResponseBase(_HttpResponseBase): """Base class for accessing response data. @@ -139,10 +119,7 @@ def __init__(self, pipeline, response): self.range = headers["Range"] else: self.range_header = None - if 'etag' in headers: - self.etag = response.headers['etag'] - else: - self.etag = None + self.etag = response.headers['etag'] if 'etag' in headers else None def __len__(self): return self.content_length @@ -170,38 +147,35 @@ def __next__(self): if retry_total <= 0: _LOGGER.warning("Unable to stream download: %s", ex) raise ex + if not self.etag: + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + time.sleep(retry_interval) + headers = self.request.headers.copy() + if not self.range_header: + range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} else: - if not self.etag: - _LOGGER.warning("Unable to stream download: %s", ex) - raise ex - time.sleep(retry_interval) - # todo handle pre-set range & x-ms-range - headers = self.request.headers.copy() - if not self.range_header: - range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} - else: - range_header = {self.range_header: make_range_header(self.range, self.downloaded)} - range_header.update({'If-Match': self.etag}) - headers.update(range_header) - try: - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if not resp.http_response: - continue - if resp.http_response.status_code == 412: - raise ex - self.response = resp - self.iter_content_func = resp.http_response.iter_content(self.block_size) - chunk = next(self.iter_content_func) - self.downloaded += self.block_size - return chunk - except StopIteration: - self.response.internal_response.close() - raise StopIteration() - except Exception: # pylint: disable=broad-except + range_header = {self.range_header: make_range_header(self.range, self.downloaded)} + range_header.update({'If-Match': self.etag}) + headers.update(range_header) + try: + resp = self.pipeline.run(self.request, stream=True, headers=headers) + if not resp.http_response: continue + if resp.http_response.status_code == 412: + raise ex + self.response = resp.http_response + self.iter_content_func = self.response.internal_response.iter_content(self.block_size) + chunk = next(self.iter_content_func) if not chunk: - self.response.internal_response.close() raise StopIteration() + self.downloaded += self.block_size + return chunk + except StopIteration: + self.response.internal_response.close() + raise StopIteration() + except Exception: # pylint: disable=broad-except + continue except requests.exceptions.StreamConsumedError: raise except Exception as err: diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index bfc28735df5c..5a112d91d265 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -37,7 +37,7 @@ ServiceResponseError ) from azure.core.pipeline import Pipeline -from ._base import HttpRequest +from ._base import HttpRequest, parse_range_header, make_range_header from ._base_async import ( AsyncHttpResponse, _ResponseStopIteration, @@ -63,6 +63,16 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 + headers = response.headers + if "x-ms-range" in headers: + self.range_header = "x-ms-range" + self.range = headers["x-ms-range"] + elif "Range" in headers: + self.range_header = "Range" + self.range = headers["Range"] + else: + self.range_header = None + self.etag = response.headers['etag'] if 'etag' in headers else None def __len__(self): return self.content_length @@ -70,36 +80,49 @@ def __len__(self): async def __anext__(self): retry_active = True retry_total = 3 - while retry_active: + retry_interval = 1 # 1 second + try: try: - try: - chunk = await trio.to_thread.run_sync( - _iterate_response_content, - self.iter_content_func, - ) - except AttributeError: # trio < 0.12.1 - chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): + chunk = await trio.to_thread.run_sync( + _iterate_response_content, + self.iter_content_func, + ) + except AttributeError: # trio < 0.12.1 + chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + self.downloaded += self.block_size + return chunk + except _ResponseStopIteration: + await self.response.internal_response.close() + raise StopAsyncIteration() + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as ex: + while retry_active: retry_total -= 1 if retry_total <= 0: - retry_active = False + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + if not self.etag: + _LOGGER.warning("Unable to stream download: %s", ex) + raise ex + await trio.sleep(retry_interval) + headers = self.request.headers.copy() + if not self.range_header: + range_header = {'range': 'bytes=' + str(self.downloaded) + '-'} else: - await trio.sleep(1) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} + range_header = {self.range_header: make_range_header(self.range, self.downloaded)} + range_header.update({'If-Match': self.etag}) + headers.update(range_header) + try: resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.status_code == 416: - raise + if not resp.http_response: + continue + if resp.http_response.status_code == 412: + continue try: chunk = await trio.to_thread.run_sync( _iterate_response_content, @@ -111,16 +134,20 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: - raise StopIteration() - self.downloaded += len(chunk) + raise StopAsyncIteration() + self.downloaded += self.block_size return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + except StopAsyncIteration: + await self.response.internal_response.close() + raise StopAsyncIteration() + except Exception: # pylint: disable=broad-except + continue + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + await self.response.internal_response.close() + raise class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore """Asynchronous streaming of data from the response. From c73f6a63018b834719d51a3f90f5e13952b8eca8 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 14:00:12 -0800 Subject: [PATCH 13/24] updates --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 56712f704c4c..b0184ee23a0c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -235,7 +235,8 @@ async def __anext__(self): self.downloaded += self.block_size return chunk except _ResponseStopIteration: - await self.response.internal_response.close() + if self.response.internal_response: + await self.response.internal_response.close() raise StopAsyncIteration() except (ChunkedEncodingError, ConnectionError) as ex: while retry_active: From 526fca445ab20e2cbec8c9006b033745c6a087b7 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 14:29:49 -0800 Subject: [PATCH 14/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 1 - .../azure/core/pipeline/transport/_requests_asyncio.py | 1 - .../azure-core/azure/core/pipeline/transport/_requests_basic.py | 1 - .../azure-core/azure/core/pipeline/transport/_requests_trio.py | 1 - 4 files changed, 4 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index b0184ee23a0c..cda382de3af6 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -276,7 +276,6 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - await self.response.internal_response.close() raise class AioHttpTransportResponse(AsyncHttpResponse): diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 0eb45f591089..c3c68bc3b6f4 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -224,7 +224,6 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - await self.response.internal_response.close() raise class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 5b710d309b7d..0428def75a6f 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -180,7 +180,6 @@ def __next__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() raise next = __next__ # Python 2 compatibility. diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 5a112d91d265..7e6ff35997ed 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -146,7 +146,6 @@ async def __anext__(self): raise except Exception as err: _LOGGER.warning("Unable to stream download: %s", err) - await self.response.internal_response.close() raise class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore From 76a4186a1294f43190393f59792b193dc10b684c Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 14:48:55 -0800 Subject: [PATCH 15/24] updates --- .../azure-core/azure/core/pipeline/transport/_aiohttp.py | 5 ++--- .../azure/core/pipeline/transport/_requests_asyncio.py | 3 +-- .../azure/core/pipeline/transport/_requests_basic.py | 4 ++-- .../azure/core/pipeline/transport/_requests_trio.py | 4 ++-- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index cda382de3af6..6149b2e7fb98 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -231,12 +231,11 @@ async def __anext__(self): try: chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: + await self.response.internal_response.close() raise _ResponseStopIteration() self.downloaded += self.block_size return chunk except _ResponseStopIteration: - if self.response.internal_response: - await self.response.internal_response.close() raise StopAsyncIteration() except (ChunkedEncodingError, ConnectionError) as ex: while retry_active: @@ -264,11 +263,11 @@ async def __anext__(self): self.response = resp.http_response chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: + await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk except StopAsyncIteration: - await self.response.internal_response.close() raise StopAsyncIteration() except Exception: # pylint: disable=broad-except continue diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index c3c68bc3b6f4..dc6903399ad1 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -179,7 +179,6 @@ async def __anext__(self): self.downloaded += self.block_size return chunk except _ResponseStopIteration: - await self.response.internal_response.close() raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: @@ -212,11 +211,11 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: + await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk except StopAsyncIteration: - await self.response.internal_response.close() raise StopAsyncIteration() except Exception: # pylint: disable=broad-except continue diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 0428def75a6f..6fcf1be8107c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -134,11 +134,11 @@ def __next__(self): try: chunk = next(self.iter_content_func) if not chunk: + self.response.internal_response.close() raise StopIteration() self.downloaded += self.block_size return chunk except StopIteration: - self.response.internal_response.close() raise StopIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: @@ -168,11 +168,11 @@ def __next__(self): self.iter_content_func = self.response.internal_response.iter_content(self.block_size) chunk = next(self.iter_content_func) if not chunk: + self.response.internal_response.close() raise StopIteration() self.downloaded += self.block_size return chunk except StopIteration: - self.response.internal_response.close() raise StopIteration() except Exception: # pylint: disable=broad-except continue diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 7e6ff35997ed..c91c95ade95c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -93,11 +93,11 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: + await self.response.internal_response.close() raise _ResponseStopIteration() self.downloaded += self.block_size return chunk except _ResponseStopIteration: - await self.response.internal_response.close() raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: @@ -134,11 +134,11 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: + await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk except StopAsyncIteration: - await self.response.internal_response.close() raise StopAsyncIteration() except Exception: # pylint: disable=broad-except continue From 3d94d8ddb82ea1300d735fe9d95457fe0c80db6f Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 14:49:12 -0800 Subject: [PATCH 16/24] updates --- .../test_stream_generator_async.py | 23 +++++++++++-------- .../azure-core/tests/test_stream_generator.py | 18 ++++++++++----- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index aca9bcd3ed25..854d0fdd752c 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -18,8 +18,8 @@ @pytest.mark.asyncio async def test_connection_error_response(): class MockTransport(AsyncHttpTransport): - def __init__(self): - self._count = 0 + def __init__(self, error=True): + self._error = error async def __aexit__(self, exc_type, exc_val, exc_tb): pass @@ -32,22 +32,24 @@ async def send(self, request, **kwargs): request = HttpRequest('GET', 'http://127.0.0.1/') response = AsyncHttpResponse(request, None) response.status_code = 200 + response.internal_response = MockInternalResponse(error=False) return response class MockContent(): - def __init__(self): - self._first = True + def __init__(self, error=True): + self._error = error async def read(self, block_size): - if self._first: - self._first = False + if self._error: raise ConnectionError return None class MockInternalResponse(): - def __init__(self): + def __init__(self, error=True): self.headers = {} - self.content = MockContent() + self._error = error + self.content = MockContent(error=self._error) + self.status_code = 200 async def close(self): pass @@ -60,6 +62,7 @@ async def __call__(self, *args, **kwargs): pipeline = AsyncPipeline(MockTransport()) http_response = AsyncHttpResponse(http_request, None) http_response.internal_response = MockInternalResponse() + http_response.headers['etag'] = "etag" stream = AioHttpStreamDownloadGenerator(pipeline, http_response) with mock.patch('asyncio.sleep', new_callable=AsyncMock): with pytest.raises(StopAsyncIteration): @@ -125,7 +128,7 @@ def stream(self, chunk_size, decode_content=False): left = total_response_size while left > 0: if left <= block_size: - raise ConnectionError() + raise requests.exceptions.ConnectionError() data = b"X" * min(chunk_size, left) left -= len(data) yield data @@ -152,6 +155,6 @@ async def mock_run(self, *args, **kwargs): pipeline = AsyncPipeline(transport) pipeline.run = mock_run downloader = response.stream_download(pipeline) - with pytest.raises(ConnectionError): + with pytest.raises(requests.exceptions.ConnectionError): while True: await downloader.__anext__() diff --git a/sdk/core/azure-core/tests/test_stream_generator.py b/sdk/core/azure-core/tests/test_stream_generator.py index bb3c6a079af7..5e54a595a477 100644 --- a/sdk/core/azure-core/tests/test_stream_generator.py +++ b/sdk/core/azure-core/tests/test_stream_generator.py @@ -20,8 +20,8 @@ def test_connection_error_response(): class MockTransport(HttpTransport): - def __init__(self): - self._count = 0 + def __init__(self, error=True): + self._error = error def __exit__(self, exc_type, exc_val, exc_tb): pass @@ -34,19 +34,24 @@ def send(self, request, **kwargs): request = HttpRequest('GET', 'http://127.0.0.1/') response = HttpResponse(request, None) response.status_code = 200 + response.internal_response = MockInternalResponse(error=False) return response def next(self): self.__next__() def __next__(self): - if self._count == 0: - self._count += 1 + if self._error: raise requests.exceptions.ConnectionError + return None class MockInternalResponse(): + def __init__(self, error=True): + self._error = error + self.status_code = 200 + def iter_content(self, block_size): - return MockTransport() + return MockTransport(error=self._error) def close(self): pass @@ -54,7 +59,8 @@ def close(self): http_request = HttpRequest('GET', 'http://127.0.0.1/') pipeline = Pipeline(MockTransport()) http_response = HttpResponse(http_request, None) - http_response.internal_response = MockInternalResponse() + http_response.internal_response = MockInternalResponse(error=True) + http_response.headers['etag'] = "etag" stream = StreamDownloadGenerator(pipeline, http_response) with mock.patch('time.sleep', return_value=None): with pytest.raises(StopIteration): From 5c97a13e02b856e32082f6c407762aa6bd9e8174 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 16:04:52 -0800 Subject: [PATCH 17/24] updates --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 2 +- .../azure/core/pipeline/transport/_requests_asyncio.py | 2 +- .../azure-core/azure/core/pipeline/transport/_requests_trio.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 6149b2e7fb98..0a13cc32635a 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -212,7 +212,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.headers if "x-ms-range" in headers: - self.range_header = "x-ms-range" + self.range_header = "x-ms-range" # type: Optional[str] self.range = headers["x-ms-range"] elif "Range" in headers: self.range_header = "Range" diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index dc6903399ad1..9bf4c50c3fab 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -151,7 +151,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.headers if "x-ms-range" in headers: - self.range_header = "x-ms-range" + self.range_header = "x-ms-range" # type: Optional[str] self.range = headers["x-ms-range"] elif "Range" in headers: self.range_header = "Range" diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index c91c95ade95c..695ffc0d1a7c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -65,7 +65,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.headers if "x-ms-range" in headers: - self.range_header = "x-ms-range" + self.range_header = "x-ms-range" # type: Optional[str] self.range = headers["x-ms-range"] elif "Range" in headers: self.range_header = "Range" From 21f30adc6c1e10c10ed31430523c9aa761888cfa Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 16:22:36 -0800 Subject: [PATCH 18/24] updates --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 2 -- .../azure/core/pipeline/transport/_requests_asyncio.py | 1 - .../azure-core/azure/core/pipeline/transport/_requests_trio.py | 2 -- 3 files changed, 5 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 0a13cc32635a..63916075d695 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -231,7 +231,6 @@ async def __anext__(self): try: chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: - await self.response.internal_response.close() raise _ResponseStopIteration() self.downloaded += self.block_size return chunk @@ -263,7 +262,6 @@ async def __anext__(self): self.response = resp.http_response chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: - await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 9bf4c50c3fab..49e8877f3edf 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -211,7 +211,6 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: - await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 695ffc0d1a7c..56241f6c257e 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -93,7 +93,6 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: - await self.response.internal_response.close() raise _ResponseStopIteration() self.downloaded += self.block_size return chunk @@ -134,7 +133,6 @@ async def __anext__(self): self.iter_content_func, ) if not chunk: - await self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk From cd8acc658dc262b2456ad3d915f8b26ba4fab088 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 25 Feb 2021 17:06:07 -0800 Subject: [PATCH 19/24] updates --- .../azure-core/azure/core/pipeline/transport/_aiohttp.py | 8 +++++--- .../azure-core/azure/core/pipeline/transport/_base.py | 5 ++--- .../azure/core/pipeline/transport/_requests_asyncio.py | 6 +++--- .../azure/core/pipeline/transport/_requests_basic.py | 6 +++--- .../azure/core/pipeline/transport/_requests_trio.py | 6 +++--- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 63916075d695..05986d3a8a24 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -213,10 +213,10 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.headers if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] - self.range = headers["x-ms-range"] + self.range = parse_range_header(headers["x-ms-range"]) elif "Range" in headers: self.range_header = "Range" - self.range = headers["Range"] + self.range = parse_range_header(headers["Range"]) else: self.range_header = None self.etag = response.headers['etag'] if 'etag' in headers else None @@ -224,13 +224,14 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: def __len__(self): return self.content_length - async def __anext__(self): + async def __anext__(self): # pylint:disable=too-many-statements retry_active = True retry_total = 3 retry_interval = 1 # 1 second try: chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: + self.response.internal_response.close() raise _ResponseStopIteration() self.downloaded += self.block_size return chunk @@ -262,6 +263,7 @@ async def __anext__(self): self.response = resp.http_response chunk = await self.response.internal_response.content.read(self.block_size) if not chunk: + self.response.internal_response.close() raise StopAsyncIteration() self.downloaded += self.block_size return chunk diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_base.py b/sdk/core/azure-core/azure/core/pipeline/transport/_base.py index 6ad405850735..d3e4e041f267 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_base.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_base.py @@ -969,8 +969,8 @@ def parse_range_header(header_value): range_value = header_value.strip() if not range_value.startswith("bytes="): raise ValueError("Invalid header") - range = range_value[6:] - ret = range.split("-") + range_str = range_value[6:] + ret = range_str.split("-") if len(ret) < 2: raise ValueError("Invalid header") start = int(ret[0]) if ret[0] else -1 @@ -985,4 +985,3 @@ def make_range_header(original_range, downloaded_size=0): if original_range[1] == -1: return "bytes=" + str(start) + "-" return "bytes=" + str(start) + "-" + str(original_range[1]) - diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 49e8877f3edf..b62eb34c5327 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -152,10 +152,10 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.headers if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] - self.range = headers["x-ms-range"] + self.range = parse_range_header(headers["x-ms-range"]) elif "Range" in headers: self.range_header = "Range" - self.range = headers["Range"] + self.range = parse_range_header(headers["Range"]) else: self.range_header = None self.etag = response.headers['etag'] if 'etag' in headers else None @@ -163,7 +163,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: def __len__(self): return self.content_length - async def __anext__(self): + async def __anext__(self): # pylint:disable=too-many-statements loop = _get_running_loop() retry_active = True retry_total = 3 diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 6fcf1be8107c..f787dfa5e9bc 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -113,10 +113,10 @@ def __init__(self, pipeline, response): headers = response.headers if "x-ms-range" in headers: self.range_header = "x-ms-range" - self.range = headers["x-ms-range"] + self.range = parse_range_header(headers["x-ms-range"]) elif "Range" in headers: self.range_header = "Range" - self.range = headers["Range"] + self.range = parse_range_header(headers["Range"]) else: self.range_header = None self.etag = response.headers['etag'] if 'etag' in headers else None @@ -127,7 +127,7 @@ def __len__(self): def __iter__(self): return self - def __next__(self): + def __next__(self): # pylint:disable=too-many-statements retry_active = True retry_total = 3 retry_interval = 1 # 1 second diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 56241f6c257e..926950e5b9e4 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -66,10 +66,10 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.headers if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] - self.range = headers["x-ms-range"] + self.range = parse_range_header(headers["x-ms-range"]) elif "Range" in headers: self.range_header = "Range" - self.range = headers["Range"] + self.range = parse_range_header(headers["Range"]) else: self.range_header = None self.etag = response.headers['etag'] if 'etag' in headers else None @@ -77,7 +77,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: def __len__(self): return self.content_length - async def __anext__(self): + async def __anext__(self): # pylint:disable=too-many-statements retry_active = True retry_total = 3 retry_interval = 1 # 1 second From ba3fde36181b1faa11fcc8c06666bd88ac02a3ce Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 26 Feb 2021 08:43:44 -0800 Subject: [PATCH 20/24] update --- .../azure-core/azure/core/pipeline/transport/_requests_basic.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index f787dfa5e9bc..6b080db486af 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -27,7 +27,6 @@ import logging from typing import Iterator, Optional, Any, Union, TypeVar import time -import six import urllib3 # type: ignore from urllib3.util.retry import Retry # type: ignore import requests From 5d93063bdfafa36beea4c91c8af74e105fda7423 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 4 Mar 2021 10:33:57 -0800 Subject: [PATCH 21/24] update --- .../azure-core/azure/core/pipeline/transport/_aiohttp.py | 8 ++++++-- .../azure/core/pipeline/transport/_requests_asyncio.py | 8 ++++++-- .../azure/core/pipeline/transport/_requests_basic.py | 9 +++++++-- .../azure/core/pipeline/transport/_requests_trio.py | 8 ++++++-- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 05986d3a8a24..02bbc3a01869 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -208,9 +208,11 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.request = response.request self.response = response self.block_size = response.block_size - self.content_length = int(response.internal_response.headers.get('Content-Length', 0)) self.downloaded = 0 - headers = response.headers + headers = response.internal_response.headers + self.content_length = int(headers.get('Content-Length', 0)) + self._compressed = True if 'compress' in transfer_header \ + or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) @@ -238,6 +240,8 @@ async def __anext__(self): # pylint:disable=too-many-statements except _ResponseStopIteration: raise StopAsyncIteration() except (ChunkedEncodingError, ConnectionError) as ex: + if self._compressed: + raise ex while retry_active: retry_total -= 1 if retry_total <= 0: diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index b62eb34c5327..f64687e6849a 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -147,9 +147,11 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.response = response self.block_size = response.block_size self.iter_content_func = self.response.internal_response.iter_content(self.block_size) - self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 - headers = response.headers + headers = response.internal_response.headers + self.content_length = int(headers.get('Content-Length', 0)) + self._compressed = True if 'compress' in transfer_header \ + or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) @@ -182,6 +184,8 @@ async def __anext__(self): # pylint:disable=too-many-statements raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: + if self._compressed: + raise ex while retry_active: retry_total -= 1 if retry_total <= 0: diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 6b080db486af..10603f57b770 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -107,9 +107,12 @@ def __init__(self, pipeline, response): self.response = response self.block_size = response.block_size self.iter_content_func = self.response.internal_response.iter_content(self.block_size) - self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 - headers = response.headers + headers = response.internal_response.headers + self.content_length = int(headers.get('Content-Length', 0)) + transfer_header = headers.get('Transfer-Encoding', '') + self._compressed = True if 'compress' in transfer_header \ + or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: self.range_header = "x-ms-range" self.range = parse_range_header(headers["x-ms-range"]) @@ -141,6 +144,8 @@ def __next__(self): # pylint:disable=too-many-statements raise StopIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: + if self._compressed: + raise ex while retry_active: retry_total -= 1 if retry_total <= 0: diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 926950e5b9e4..0e8022fde606 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -61,9 +61,11 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.response = response self.block_size = response.block_size self.iter_content_func = self.response.internal_response.iter_content(self.block_size) - self.content_length = int(response.headers.get('Content-Length', 0)) self.downloaded = 0 - headers = response.headers + headers = response.internal_response.headers + self.content_length = int(headers.get('Content-Length', 0)) + self._compressed = True if 'compress' in transfer_header \ + or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) @@ -100,6 +102,8 @@ async def __anext__(self): # pylint:disable=too-many-statements raise StopAsyncIteration() except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as ex: + if self._compressed: + raise ex while retry_active: retry_total -= 1 if retry_total <= 0: From 207a61342e891112222111b0cdd8123a6de81b1f Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 4 Mar 2021 12:25:44 -0800 Subject: [PATCH 22/24] update --- .../azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 ++- .../azure/core/pipeline/transport/_requests_asyncio.py | 3 ++- .../azure/core/pipeline/transport/_requests_basic.py | 2 +- .../azure/core/pipeline/transport/_requests_trio.py | 3 ++- .../tests/async_tests/test_stream_generator_async.py | 3 +-- sdk/core/azure-core/tests/test_stream_generator.py | 5 ++++- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 02bbc3a01869..8eecb844d766 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -211,6 +211,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) + transfer_header = headers.get('Transfer-Encoding', '') self._compressed = True if 'compress' in transfer_header \ or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: @@ -221,7 +222,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.range = parse_range_header(headers["Range"]) else: self.range_header = None - self.etag = response.headers['etag'] if 'etag' in headers else None + self.etag = headers.get('etag') def __len__(self): return self.content_length diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index f64687e6849a..afd6b889da7b 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -150,6 +150,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) + transfer_header = headers.get('Transfer-Encoding', '') self._compressed = True if 'compress' in transfer_header \ or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: @@ -160,7 +161,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.range = parse_range_header(headers["Range"]) else: self.range_header = None - self.etag = response.headers['etag'] if 'etag' in headers else None + self.etag = headers.get('etag') def __len__(self): return self.content_length diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 10603f57b770..ef1cf94bf75a 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -121,7 +121,7 @@ def __init__(self, pipeline, response): self.range = parse_range_header(headers["Range"]) else: self.range_header = None - self.etag = response.headers['etag'] if 'etag' in headers else None + self.etag = headers.get('etag') def __len__(self): return self.content_length diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index 0e8022fde606..af260931ffe1 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -64,6 +64,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.downloaded = 0 headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) + transfer_header = headers.get('Transfer-Encoding', '') self._compressed = True if 'compress' in transfer_header \ or 'deflate' in transfer_header or 'gzip' in transfer_header else False if "x-ms-range" in headers: @@ -74,7 +75,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: self.range = parse_range_header(headers["Range"]) else: self.range_header = None - self.etag = response.headers['etag'] if 'etag' in headers else None + self.etag = headers.get('etag') def __len__(self): return self.content_length diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index 854d0fdd752c..486b570c6afe 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -46,7 +46,7 @@ async def read(self, block_size): class MockInternalResponse(): def __init__(self, error=True): - self.headers = {} + self.headers = {"etag": "etag"} self._error = error self.content = MockContent(error=self._error) self.status_code = 200 @@ -62,7 +62,6 @@ async def __call__(self, *args, **kwargs): pipeline = AsyncPipeline(MockTransport()) http_response = AsyncHttpResponse(http_request, None) http_response.internal_response = MockInternalResponse() - http_response.headers['etag'] = "etag" stream = AioHttpStreamDownloadGenerator(pipeline, http_response) with mock.patch('asyncio.sleep', new_callable=AsyncMock): with pytest.raises(StopAsyncIteration): diff --git a/sdk/core/azure-core/tests/test_stream_generator.py b/sdk/core/azure-core/tests/test_stream_generator.py index 5e54a595a477..65aaeb19f94d 100644 --- a/sdk/core/azure-core/tests/test_stream_generator.py +++ b/sdk/core/azure-core/tests/test_stream_generator.py @@ -49,6 +49,7 @@ class MockInternalResponse(): def __init__(self, error=True): self._error = error self.status_code = 200 + self.headers = {"etag":"etag"} def iter_content(self, block_size): return MockTransport(error=self._error) @@ -60,7 +61,6 @@ def close(self): pipeline = Pipeline(MockTransport()) http_response = HttpResponse(http_request, None) http_response.internal_response = MockInternalResponse(error=True) - http_response.headers['etag'] = "etag" stream = StreamDownloadGenerator(pipeline, http_response) with mock.patch('time.sleep', return_value=None): with pytest.raises(StopIteration): @@ -93,6 +93,9 @@ def __next__(self): raise requests.exceptions.ConnectionError class MockInternalResponse(): + def __init__(self): + self.headers = {} + def iter_content(self, block_size): return MockTransport() From f2016402a4990ad665468d7301daa8e2f2a4ec24 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 4 Mar 2021 12:59:08 -0800 Subject: [PATCH 23/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 3 +-- .../azure/core/pipeline/transport/_requests_asyncio.py | 3 +-- .../azure/core/pipeline/transport/_requests_basic.py | 3 +-- .../azure-core/azure/core/pipeline/transport/_requests_trio.py | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 8eecb844d766..e4eeb0cf942d 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -212,8 +212,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) transfer_header = headers.get('Transfer-Encoding', '') - self._compressed = True if 'compress' in transfer_header \ - or 'deflate' in transfer_header or 'gzip' in transfer_header else False + self._compressed = 'compress' in transfer_header or 'deflate' in transfer_header or 'gzip' in transfer_header if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index afd6b889da7b..9582c59b865c 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -151,8 +151,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) transfer_header = headers.get('Transfer-Encoding', '') - self._compressed = True if 'compress' in transfer_header \ - or 'deflate' in transfer_header or 'gzip' in transfer_header else False + self._compressed = 'compress' in transfer_header or 'deflate' in transfer_header or 'gzip' in transfer_header if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index ef1cf94bf75a..3c1e899b6649 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -111,8 +111,7 @@ def __init__(self, pipeline, response): headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) transfer_header = headers.get('Transfer-Encoding', '') - self._compressed = True if 'compress' in transfer_header \ - or 'deflate' in transfer_header or 'gzip' in transfer_header else False + self._compressed = 'compress' in transfer_header or 'deflate' in transfer_header or 'gzip' in transfer_header if "x-ms-range" in headers: self.range_header = "x-ms-range" self.range = parse_range_header(headers["x-ms-range"]) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index af260931ffe1..dbbc74761a12 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -65,8 +65,7 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None: headers = response.internal_response.headers self.content_length = int(headers.get('Content-Length', 0)) transfer_header = headers.get('Transfer-Encoding', '') - self._compressed = True if 'compress' in transfer_header \ - or 'deflate' in transfer_header or 'gzip' in transfer_header else False + self._compressed = 'compress' in transfer_header or 'deflate' in transfer_header or 'gzip' in transfer_header if "x-ms-range" in headers: self.range_header = "x-ms-range" # type: Optional[str] self.range = parse_range_header(headers["x-ms-range"]) From 5acd6b9fca5e075e37398e72464c5dd75efe1cd2 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 4 Mar 2021 13:41:02 -0800 Subject: [PATCH 24/24] update --- sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py | 2 +- .../azure/core/pipeline/transport/_requests_asyncio.py | 2 +- .../azure-core/azure/core/pipeline/transport/_requests_basic.py | 2 +- .../azure-core/azure/core/pipeline/transport/_requests_trio.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index e4eeb0cf942d..7b0e643ff8ed 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -195,7 +195,7 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR return response -class AioHttpStreamDownloadGenerator(AsyncIterator): +class AioHttpStreamDownloadGenerator(AsyncIterator): # pylint: disable=too-many-instance-attributes """Streams the response body data. :param pipeline: The pipeline object diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 9582c59b865c..156ef5a39cc1 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -133,7 +133,7 @@ async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse: return AsyncioRequestsTransportResponse(request, response, self.connection_config.data_block_size) -class AsyncioStreamDownloadGenerator(AsyncIterator): +class AsyncioStreamDownloadGenerator(AsyncIterator): # pylint: disable=too-many-instance-attributes """Streams the response body data. :param pipeline: The pipeline object diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 3c1e899b6649..780e04c83798 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -95,7 +95,7 @@ def text(self, encoding=None): return self.internal_response.text -class StreamDownloadGenerator(object): +class StreamDownloadGenerator(object): # pylint: disable=too-many-instance-attributes """Generator for streaming response data. :param pipeline: The pipeline object diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index dbbc74761a12..5ab660a60e41 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -49,7 +49,7 @@ _LOGGER = logging.getLogger(__name__) -class TrioStreamDownloadGenerator(AsyncIterator): +class TrioStreamDownloadGenerator(AsyncIterator): # pylint: disable=too-many-instance-attributes """Generator for streaming response data. :param pipeline: The pipeline object