Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement asynchronous AuthorizedSession api request class #1579

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions google/auth/aio/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Transport - Async HTTP client library support.
"""Transport - Asynchronous HTTP client library support.

:mod:`google.auth.aio` is designed to work with various asynchronous client libraries such
as aiohttp. In order to work across these libraries with different
Expand All @@ -25,7 +25,7 @@
"""

import abc
from typing import AsyncGenerator, Dict
from typing import AsyncGenerator, Dict, Mapping, Optional


class Response(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -79,3 +79,53 @@ async def read(self) -> bytes:
async def close(self):
"""Close the response after it is fully consumed to resource."""
raise NotImplementedError("close must be implemented.")


class Request(metaclass=abc.ABCMeta):
"""Interface for a callable that makes HTTP requests.

Specific transport implementations should provide an implementation of
this that adapts their specific request / response API.

.. automethod:: __call__
"""

@abc.abstractmethod
async def __call__(
self,
url: str,
method: str,
body: bytes,
headers: Optional[Mapping[str, str]],
timeout: float,
**kwargs
) -> Response:
"""Make an HTTP request.

Args:
url (str): The URI to be requested.
method (str): The HTTP method to use for the request. Defaults
to 'GET'.
body (bytes): The payload / body in HTTP request.
headers (Mapping[str, str]): Request headers.
timeout (float): The number of seconds to wait for a
response from the server. If not specified or if None, the
transport-specific default timeout will be used.
kwargs: Additional arguments passed on to the transport's
request method.

Returns:
google.auth.aio.transport.Response: The HTTP response.

Raises:
google.auth.exceptions.TransportError: If any exception occurred.
"""
# pylint: disable=redundant-returns-doc, missing-raises-doc
# (pylint doesn't play well with abstract docstrings.)
raise NotImplementedError("__call__ must be implemented.")

async def close(self) -> None:
"""
Close the underlying session.
"""
raise NotImplementedError("close must be implemented.")
111 changes: 105 additions & 6 deletions google/auth/aio/transport/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@
"""Transport adapter for AIOHTTP Requests.
"""

import asyncio
from contextlib import asynccontextmanager
import time
from typing import AsyncGenerator, Dict, Mapping, Optional

try:
import aiohttp
except ImportError as caught_exc: # pragma: NO COVER
raise ImportError(
"The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
) from caught_exc
from typing import AsyncGenerator, Dict

from google.auth import _helpers
from google.auth import exceptions
from google.auth.aio import transport
from google.auth.exceptions import TimeoutError

import asyncio
import time
from contextlib import asynccontextmanager

_DEFAULT_TIMEOUT_SECONDS = 180


@asynccontextmanager
Expand Down Expand Up @@ -77,7 +81,6 @@ async def with_timeout(coro):


class Response(transport.Response):

"""
Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AuthorizedSession``.

Expand Down Expand Up @@ -113,4 +116,100 @@ async def read(self) -> bytes:

@_helpers.copy_docstring(transport.Response)
async def close(self):
return await self._response.close()
self._response.close()


class Request(transport.Request):
"""Asynchronous Requests request adapter.

This class is used internally for making requests using aiohttp
in a consistent way. If you use :class:`AuthorizedSession` you do not need
to construct or use this class directly.

This class can be useful if you want to configure a Request callable
with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if
you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance::

import aiohttp
import google.auth.aio.transport.aiohttp

# Use Case 1:
ohmayr marked this conversation as resolved.
Show resolved Hide resolved
request = google.auth.aio.transport.aiohttp.Request()
await credentials.refresh(request)

# Use Case 2:
ohmayr marked this conversation as resolved.
Show resolved Hide resolved
session = session=aiohttp.ClientSession(auto_decompress=False)
request = google.auth.aio.transport.aiohttp.Request(session=session)
auth_sesion = google.auth.aio.transport.sessions.AuthorizedSession(auth_request=request)

Args:
session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used
to make HTTP requests. If not specified, a session will be created.

.. automethod:: __call__
"""

def __init__(self, session: aiohttp.ClientSession = None):
self.session = session or aiohttp.ClientSession()

async def __call__(
self,
url: str,
method: str = "GET",
body: Optional[bytes] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: float = _DEFAULT_TIMEOUT_SECONDS,
**kwargs,
) -> transport.Response:
"""
Make an HTTP request using aiohttp.

Args:
url (str): The URL to be requested.
method (Optional[str]):
The HTTP method to use for the request. Defaults to 'GET'.
body (Optional[bytes]):
The payload or body in HTTP request.
headers (Optional[Mapping[str, str]]):
Request headers.
timeout (float): The number of seconds to wait for a
response from the server. If not specified or if None, the
requests default timeout will be used.
kwargs: Additional arguments passed through to the underlying
aiohttp :meth:`aiohttp.Session.request` method.

Returns:
google.auth.aio.transport.Response: The HTTP response.

Raises:
- google.auth.exceptions.TransportError: If the request fails.
- google.auth.exceptions.TimeoutError: If the request times out.
"""

try:
client_timeout = aiohttp.ClientTimeout(total=timeout)
response = await self.session.request(
method,
url,
data=body,
headers=headers,
timeout=client_timeout,
**kwargs,
)
return Response(response)

except aiohttp.ClientError as caught_exc:
new_exc = exceptions.TransportError(f"Failed to send request to {url}.")
raise new_exc from caught_exc

except asyncio.TimeoutError as caught_exc:
new_exc = exceptions.TimeoutError(
f"Request timed out after {timeout} seconds."
)
raise new_exc from caught_exc

async def close(self) -> None:
"""
Close the underlying aiohttp session to release the acquired resources.
"""
await self.session.close()
53 changes: 50 additions & 3 deletions tests/transport/aio/test_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from unittest.mock import AsyncMock, Mock, patch

from aioresponses import aioresponses
import pytest # type: ignore
import pytest_asyncio

import asyncio

from google.auth import exceptions
import google.auth.aio.transport.aiohttp as auth_aiohttp

from google.auth.exceptions import TimeoutError


try:
import aiohttp
except ImportError as caught_exc: # pragma: NO COVER
raise ImportError(
"The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
) from caught_exc


@pytest.fixture
async def simple_async_task():
return True
Expand Down Expand Up @@ -139,3 +148,41 @@ async def test_timeout_with_async_task_timing_out_before_context(
assert exc.match(
f"The operation {simple_async_task} exceeded the configured timeout of {self.default_timeout}s."
)


@pytest.mark.asyncio
class TestRequest:
@pytest_asyncio.fixture
async def aiohttp_request(self):
request = auth_aiohttp.Request()
yield request
await request.close()

async def test_request_call_success(self, aiohttp_request):
with aioresponses() as m:
mocked_chunks = [b"Cavefish ", b"have ", b"no ", b"sight."]
mocked_response = b"".join(mocked_chunks)
m.get("http://example.com", status=200, body=mocked_response)
response = await aiohttp_request("http://example.com")
assert response.status_code == 200
assert response.headers == {"Content-Type": "application/json"}
content = b"".join([chunk async for chunk in response.content()])
assert content == b"Cavefish have no sight."

async def test_request_call_raises_client_error(self, aiohttp_request):
with aioresponses() as m:
m.get("http://example.com", exception=aiohttp.ClientError)

with pytest.raises(exceptions.TransportError) as exc:
await aiohttp_request("http://example.com/api")

exc.match("Failed to send request to http://example.com/api.")

async def test_request_call_raises_timeout_error(self, aiohttp_request):
with aioresponses() as m:
m.get("http://example.com", exception=asyncio.TimeoutError)

with pytest.raises(exceptions.TimeoutError) as exc:
await aiohttp_request("http://example.com")

exc.match("Request timed out after 180 seconds.")