Skip to content
1 change: 1 addition & 0 deletions sdk/core/azure-core/azure/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(self, message=None, response=None, **kwargs):
if response:
self.reason = response.reason
self.status_code = response.status_code

message = message or "Operation returned an invalid status '{}'".format(self.reason)
try:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def deserialize_from_http_generics(cls, response):
# Try to use content-type from headers if available
content_type = None
if response.content_type: # type: ignore
content_type = response.content_type[0].strip().lower() # type: ignore
content_type = response.content_type.split(";")[0].strip().lower() # type: ignore

# Ouch, this server did not declare what it sent...
# Let's guess it's JSON...
Expand Down
26 changes: 12 additions & 14 deletions sdk/core/azure-core/azure/core/pipeline/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import aiohttp

from azure.core.configuration import ConnectionConfiguration
from azure.core.exceptions import ServiceRequestError
from azure.core.exceptions import ServiceRequestError, ServiceResponseError
from azure.core.pipeline import Pipeline

from requests.exceptions import (
Expand Down Expand Up @@ -181,7 +181,8 @@ async def send(self, request: HttpRequest, **config: Any) -> Optional[AsyncHttpR
await response.load_body()
except aiohttp.client_exceptions.ClientConnectorError as err:
error = ServiceRequestError(err, error=err)

except asyncio.TimeoutError as err:
error = ServiceResponseError(err, error=err)
if error:
raise error
return response
Expand All @@ -191,19 +192,16 @@ class AioHttpStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The client response object.
:type response: aiohttp.ClientResponse
:param block_size: block size of data sent over connection.
:type block_size: int
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest,
response: aiohttp.ClientResponse, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.content_length = int(response.headers.get('Content-Length', 0))
self.block_size = response.block_size
self.content_length = int(response.internal_response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
Expand All @@ -215,13 +213,13 @@ async def __anext__(self):
retry_interval = 1000
while retry_active:
try:
chunk = await self.response.content.read(self.block_size)
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (ChunkedEncodingError, ConnectionError):
retry_total -= 1
Expand All @@ -233,7 +231,7 @@ async def __anext__(self):
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await self.response.content.read(self.block_size)
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise StopIteration()
self.downloaded += chunk
Expand All @@ -243,7 +241,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise

class AioHttpTransportResponse(AsyncHttpResponse):
Expand Down Expand Up @@ -282,4 +280,4 @@ def stream_download(self, pipeline) -> AsyncIteratorType[bytes]:
:param pipeline: The pipeline object
:type pipeline: azure.core.pipeline
"""
return AioHttpStreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)
return AioHttpStreamDownloadGenerator(pipeline, self)
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ async def __aenter__(self):
async def __aexit__(self, *exc_details): # pylint: disable=arguments-differ
return super(AsyncioRequestsTransport, self).__exit__()

async def sleep(self, duration):
await asyncio.sleep(duration)

async def send(self, request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse: # type: ignore
"""Send the request using this HTTP sender.

Expand Down Expand Up @@ -135,18 +138,16 @@ class AsyncioStreamDownloadGenerator(AsyncIterator):
"""Streams the response body data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: block size of data sent over connection.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -170,7 +171,7 @@ async def __anext__(self):
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -197,7 +198,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise


Expand All @@ -206,5 +207,4 @@ class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportRespo
"""
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming request body data."""
return AsyncioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore
return AsyncioStreamDownloadGenerator(pipeline, self) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def __init__(self, request, requests_response, block_size=None):
self.status_code = requests_response.status_code
self.headers = requests_response.headers
self.reason = requests_response.reason
content_type = requests_response.headers.get('content-type')
if content_type:
self.content_type = content_type.split(";")
self.content_type = requests_response.headers.get('content-type')

def body(self):
return self.internal_response.content
Expand All @@ -82,18 +80,16 @@ class StreamDownloadGenerator(object):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline, request, response, block_size):
def __init__(self, pipeline, response):
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -115,7 +111,7 @@ def __next__(self):
self.downloaded += self.block_size
return chunk
except StopIteration:
self.response.close()
self.response.internal_response.close()
raise StopIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -138,7 +134,7 @@ def __next__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise
next = __next__ # Python 2 compatibility.

Expand All @@ -149,7 +145,7 @@ class RequestsTransportResponse(HttpResponse, _RequestsTransportResponseBase):
def stream_download(self, pipeline):
# type: (PipelineType) -> Iterator[bytes]
"""Generator for streaming request body data."""
return StreamDownloadGenerator(pipeline, self.request, self.internal_response, self.block_size)
return StreamDownloadGenerator(pipeline, self)


class RequestsTransport(HttpTransport):
Expand Down
17 changes: 7 additions & 10 deletions sdk/core/azure-core/azure/core/pipeline/transport/requests_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,16 @@ class TrioStreamDownloadGenerator(AsyncIterator):
"""Generator for streaming response data.

:param pipeline: The pipeline object
:param request: The request object
:param response: The response object.
:param int block_size: Number of bytes to read into memory.
:param generator iter_content_func: Iterator for response data.
:param int content_length: size of body in bytes.
"""
def __init__(self, pipeline: Pipeline, request: HttpRequest, response: requests.Response, block_size: int) -> None:
def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse) -> None:
self.pipeline = pipeline
self.request = request
self.request = response.request
self.response = response
self.block_size = block_size
self.iter_content_func = self.response.iter_content(self.block_size)
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

Expand All @@ -85,7 +83,7 @@ async def __anext__(self):
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.close()
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
Expand All @@ -111,7 +109,7 @@ async def __anext__(self):
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.close()
self.response.internal_response.close()
raise

class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
Expand All @@ -120,8 +118,7 @@ class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse
def stream_download(self, pipeline) -> AsyncIteratorType[bytes]: # type: ignore
"""Generator for streaming response data.
"""
return TrioStreamDownloadGenerator(pipeline, self.request,
self.internal_response, self.block_size) # type: ignore
return TrioStreamDownloadGenerator(pipeline, self) # type: ignore


class TrioRequestsTransport(RequestsTransport, AsyncHttpTransport): # type: ignore
Expand Down
17 changes: 17 additions & 0 deletions sdk/core/azure-core/tests/azure_core_asynctests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ async def test_basic_async_requests():

assert response.http_response.status_code == 200

@pytest.mark.asyncio
async def test_async_transport_sleep():

async with AsyncioRequestsTransport() as transport:
await transport.sleep(1)

async with AioHttpTransport() as transport:
await transport.sleep(1)

def test_async_trio_transport_sleep():

async def do():
async with TrioRequestsTransport() as transport:
await transport.sleep(1)

response = trio.run(do)

@pytest.mark.asyncio
async def test_conf_async_requests():

Expand Down
2 changes: 1 addition & 1 deletion sdk/identity/azure-identity/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def mock_response(status_code=200, headers={}, json_payload=None):
if json_payload is not None:
response.text = lambda: json.dumps(json_payload)
response.headers["content-type"] = "application/json"
response.content_type = ["application/json"]
response.content_type = "application/json"
return response


Expand Down
8 changes: 4 additions & 4 deletions sdk/identity/azure-identity/tests/test_authn_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_authn_client_deserialization():
scope = "scope"

mock_response = Mock(
headers={"content-type": "application/json"}, status_code=200, content_type=["application/json"]
headers={"content-type": "application/json"}, status_code=200, content_type="application/json"
)
mock_send = Mock(return_value=mock_response)

Expand Down Expand Up @@ -87,7 +87,7 @@ def test_caching_when_only_expires_in_set():
text=lambda: json.dumps({"access_token": access_token, "expires_in": expires_in, "token_type": "Bearer"}),
headers={"content-type": "application/json"},
status_code=200,
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand All @@ -106,7 +106,7 @@ def test_expires_in_strings():
expected_token = "token"

mock_response = Mock(
headers={"content-type": "application/json"}, status_code=200, content_type=["application/json"]
headers={"content-type": "application/json"}, status_code=200, content_type="application/json"
)
mock_send = Mock(return_value=mock_response)

Expand All @@ -133,7 +133,7 @@ def test_cache_expiry():
text=lambda: json.dumps(token_payload),
headers={"content-type": "application/json"},
status_code=200,
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand Down
4 changes: 2 additions & 2 deletions sdk/identity/azure-identity/tests/test_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def test_imds_credential_cache():
text=lambda: json.dumps(token_payload),
headers={"content-type": "application/json"},
status_code=200,
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand Down Expand Up @@ -219,7 +219,7 @@ def test_imds_credential_retries():
mock_response = Mock(
text=lambda: b"{}",
headers={"content-type": "application/json", "Retry-After": "0"},
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand Down
4 changes: 2 additions & 2 deletions sdk/identity/azure-identity/tests/test_identity_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async def test_imds_credential_cache():
text=lambda: json.dumps(token_payload),
headers={"content-type": "application/json"},
status_code=200,
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand Down Expand Up @@ -225,7 +225,7 @@ async def test_imds_credential_retries():
mock_response = Mock(
text=lambda: b"{}",
headers={"content-type": "application/json", "Retry-After": "0"},
content_type=["application/json"],
content_type="application/json",
)
mock_send = Mock(return_value=mock_response)

Expand Down