diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 935f34db..fee6c695 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,16 +1,22 @@ +import logging import ssl import sys from types import TracebackType from typing import AsyncIterable, AsyncIterator, Iterable, List, Optional, Type +import httpcore + from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response from .._synchronization import AsyncEvent, AsyncLock +from .._trace import atrace from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface +logger = logging.getLogger("httpcore.connection_pool") + class RequestStatus: def __init__(self, request: Request): @@ -161,8 +167,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Reuse an existing connection if one is currently available. for idx, connection in enumerate(self._pool): if connection.can_handle_request(origin) and connection.is_available(): + kwargs = {"connection": connection, "request": status.request} + await atrace("reuse_connection", logger, status.request, kwargs) self._pool.pop(idx) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + await atrace( + "assign_request_to_connection", logger, status.request, kwargs + ) status.set_connection(connection) return True @@ -171,6 +183,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle(): await connection.aclose() + await atrace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.pop(idx) break @@ -180,7 +198,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Otherwise create a new connection. connection = self.create_connection(origin) + await atrace( + "add_connection", logger, status.request, kwargs={"connection": connection} + ) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + await atrace("assign_request_to_connection", logger, status.request, kwargs) status.set_connection(connection) return True @@ -192,6 +215,9 @@ async def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.has_expired(): await connection.aclose() + await atrace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) # If the pool size exceeds the maximum number of allowed keep-alive connections, @@ -200,6 +226,9 @@ async def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle() and pool_size > self._max_keepalive_connections: await connection.aclose() + await atrace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) pool_size -= 1 @@ -222,6 +251,7 @@ async def handle_async_request(self, request: Request) -> Response: status = RequestStatus(request) async with self._pool_lock: + await atrace("add_request", logger, request, {"request": status.request}) self._requests.append(status) await self._close_expired_connections() await self._attempt_to_acquire_connection(status) @@ -235,9 +265,22 @@ async def handle_async_request(self, request: Request) -> Response: # If we timeout here, or if the task is cancelled, then make # sure to remove the request from the queue before bubbling # up the exception. + if isinstance(exc, httpcore.PoolTimeout): + await atrace( + "timeout_waiting_for_connection", + logger, + status.request, + {"request": status.request}, + ) async with self._pool_lock: # Ensure only remove when task exists. if status in self._requests: + await atrace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) raise exc @@ -254,6 +297,13 @@ async def handle_async_request(self, request: Request) -> Response: async with self._pool_lock: # Maintain our position in the request queue, but reset the # status so that the request becomes queued again. + kwargs = {"request": status.request, "connection": connection} + await atrace( + "unassign_request_from_connection", + logger, + status.request, + kwargs, + ) status.unset_connection() await self._attempt_to_acquire_connection(status) except BaseException as exc: @@ -285,6 +335,12 @@ async def response_closed(self, status: RequestStatus) -> None: async with self._pool_lock: # Update the state of the connection pool. if status in self._requests: + await atrace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) if connection.is_closed() and connection in self._pool: diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index f64334af..43f7d732 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -1,16 +1,22 @@ +import logging import ssl import sys from types import TracebackType from typing import Iterable, Iterator, Iterable, List, Optional, Type +import httpcore + from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol from .._models import Origin, Request, Response from .._synchronization import Event, Lock +from .._trace import trace from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface +logger = logging.getLogger("httpcore.connection_pool") + class RequestStatus: def __init__(self, request: Request): @@ -161,8 +167,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Reuse an existing connection if one is currently available. for idx, connection in enumerate(self._pool): if connection.can_handle_request(origin) and connection.is_available(): + kwargs = {"connection": connection, "request": status.request} + trace("reuse_connection", logger, status.request, kwargs) self._pool.pop(idx) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + trace( + "assign_request_to_connection", logger, status.request, kwargs + ) status.set_connection(connection) return True @@ -171,6 +183,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle(): connection.close() + trace( + "remove_connection", + logger, + None, + kwargs={"connection": connection}, + ) self._pool.pop(idx) break @@ -180,7 +198,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: # Otherwise create a new connection. connection = self.create_connection(origin) + trace( + "add_connection", logger, status.request, kwargs={"connection": connection} + ) self._pool.insert(0, connection) + kwargs = {"request": status.request, "connection": connection} + trace("assign_request_to_connection", logger, status.request, kwargs) status.set_connection(connection) return True @@ -192,6 +215,9 @@ def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.has_expired(): connection.close() + trace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) # If the pool size exceeds the maximum number of allowed keep-alive connections, @@ -200,6 +226,9 @@ def _close_expired_connections(self) -> None: for idx, connection in reversed(list(enumerate(self._pool))): if connection.is_idle() and pool_size > self._max_keepalive_connections: connection.close() + trace( + "remove_connection", logger, None, kwargs={"connection": connection} + ) self._pool.pop(idx) pool_size -= 1 @@ -222,6 +251,7 @@ def handle_request(self, request: Request) -> Response: status = RequestStatus(request) with self._pool_lock: + trace("add_request", logger, request, {"request": status.request}) self._requests.append(status) self._close_expired_connections() self._attempt_to_acquire_connection(status) @@ -235,9 +265,22 @@ def handle_request(self, request: Request) -> Response: # If we timeout here, or if the task is cancelled, then make # sure to remove the request from the queue before bubbling # up the exception. + if isinstance(exc, httpcore.PoolTimeout): + trace( + "timeout_waiting_for_connection", + logger, + status.request, + {"request": status.request}, + ) with self._pool_lock: # Ensure only remove when task exists. if status in self._requests: + trace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) raise exc @@ -254,6 +297,13 @@ def handle_request(self, request: Request) -> Response: with self._pool_lock: # Maintain our position in the request queue, but reset the # status so that the request becomes queued again. + kwargs = {"request": status.request, "connection": connection} + trace( + "unassign_request_from_connection", + logger, + status.request, + kwargs, + ) status.unset_connection() self._attempt_to_acquire_connection(status) except BaseException as exc: @@ -285,6 +335,12 @@ def response_closed(self, status: RequestStatus) -> None: with self._pool_lock: # Update the state of the connection pool. if status in self._requests: + trace( + "remove_request", + logger, + status.request, + {"request": status.request}, + ) self._requests.remove(status) if connection.is_closed() and connection in self._pool: diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index df0199ab..92367caf 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -136,6 +136,9 @@ async def trace(name, kwargs): await pool.request("GET", "https://example.com/", extensions={"trace": trace}) assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -150,6 +153,7 @@ async def trace(name, kwargs): "http11.receive_response_body.complete", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -175,49 +179,59 @@ async def test_debug_request(caplog): await pool.request("GET", "http://example.com/") assert caplog.record_tuples == [ + ("httpcore.connection_pool", 10, "add_request request="), + ( + "httpcore.connection_pool", + 10, + "add_connection connection=", + ), + ( + "httpcore.connection_pool", + 10, + "assign_request_to_connection request= " + "connection=", + ), ( "httpcore.connection", - logging.DEBUG, - "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None", + 10, + "connect_tcp.started host='example.com' port=80 local_address=None " + "timeout=None socket_options=None", ), ( "httpcore.connection", - logging.DEBUG, + 10, "connect_tcp.complete return_value=", ), ( "httpcore.http11", - logging.DEBUG, + 10, "send_request_headers.started request=", ), - ("httpcore.http11", logging.DEBUG, "send_request_headers.complete"), - ( - "httpcore.http11", - logging.DEBUG, - "send_request_body.started request=", - ), - ("httpcore.http11", logging.DEBUG, "send_request_body.complete"), + ("httpcore.http11", 10, "send_request_headers.complete"), + ("httpcore.http11", 10, "send_request_body.started request="), + ("httpcore.http11", 10, "send_request_body.complete"), ( "httpcore.http11", - logging.DEBUG, + 10, "receive_response_headers.started request=", ), ( "httpcore.http11", - logging.DEBUG, - "receive_response_headers.complete return_value=" - "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", + 10, + "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', " + "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", ), ( "httpcore.http11", - logging.DEBUG, + 10, "receive_response_body.started request=", ), - ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"), - ("httpcore.http11", logging.DEBUG, "response_closed.started"), - ("httpcore.http11", logging.DEBUG, "response_closed.complete"), - ("httpcore.connection", logging.DEBUG, "close.started"), - ("httpcore.connection", logging.DEBUG, "close.complete"), + ("httpcore.http11", 10, "receive_response_body.complete"), + ("httpcore.http11", 10, "response_closed.started"), + ("httpcore.http11", 10, "response_closed.complete"), + ("httpcore.connection_pool", 10, "remove_request request="), + ("httpcore.connection", 10, "close.started"), + ("httpcore.connection", 10, "close.complete"), ] @@ -245,6 +259,9 @@ async def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -257,6 +274,7 @@ async def trace(name, kwargs): "http11.receive_response_headers.failed", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -298,8 +316,12 @@ async def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.failed", + "connection_pool.remove_request", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index aafa68aa..eeec0479 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -136,6 +136,9 @@ def trace(name, kwargs): pool.request("GET", "https://example.com/", extensions={"trace": trace}) assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -150,6 +153,7 @@ def trace(name, kwargs): "http11.receive_response_body.complete", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -175,49 +179,59 @@ def test_debug_request(caplog): pool.request("GET", "http://example.com/") assert caplog.record_tuples == [ + ("httpcore.connection_pool", 10, "add_request request="), + ( + "httpcore.connection_pool", + 10, + "add_connection connection=", + ), + ( + "httpcore.connection_pool", + 10, + "assign_request_to_connection request= " + "connection=", + ), ( "httpcore.connection", - logging.DEBUG, - "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None", + 10, + "connect_tcp.started host='example.com' port=80 local_address=None " + "timeout=None socket_options=None", ), ( "httpcore.connection", - logging.DEBUG, + 10, "connect_tcp.complete return_value=", ), ( "httpcore.http11", - logging.DEBUG, + 10, "send_request_headers.started request=", ), - ("httpcore.http11", logging.DEBUG, "send_request_headers.complete"), - ( - "httpcore.http11", - logging.DEBUG, - "send_request_body.started request=", - ), - ("httpcore.http11", logging.DEBUG, "send_request_body.complete"), + ("httpcore.http11", 10, "send_request_headers.complete"), + ("httpcore.http11", 10, "send_request_body.started request="), + ("httpcore.http11", 10, "send_request_body.complete"), ( "httpcore.http11", - logging.DEBUG, + 10, "receive_response_headers.started request=", ), ( "httpcore.http11", - logging.DEBUG, - "receive_response_headers.complete return_value=" - "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", + 10, + "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', " + "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])", ), ( "httpcore.http11", - logging.DEBUG, + 10, "receive_response_body.started request=", ), - ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"), - ("httpcore.http11", logging.DEBUG, "response_closed.started"), - ("httpcore.http11", logging.DEBUG, "response_closed.complete"), - ("httpcore.connection", logging.DEBUG, "close.started"), - ("httpcore.connection", logging.DEBUG, "close.complete"), + ("httpcore.http11", 10, "receive_response_body.complete"), + ("httpcore.http11", 10, "response_closed.started"), + ("httpcore.http11", 10, "response_closed.complete"), + ("httpcore.connection_pool", 10, "remove_request request="), + ("httpcore.connection", 10, "close.started"), + ("httpcore.connection", 10, "close.complete"), ] @@ -245,6 +259,9 @@ def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.complete", "connection.start_tls.started", @@ -257,6 +274,7 @@ def trace(name, kwargs): "http11.receive_response_headers.failed", "http11.response_closed.started", "http11.response_closed.complete", + "connection_pool.remove_request", ] @@ -298,8 +316,12 @@ def trace(name, kwargs): assert info == [] assert called == [ + "connection_pool.add_request", + "connection_pool.add_connection", + "connection_pool.assign_request_to_connection", "connection.connect_tcp.started", "connection.connect_tcp.failed", + "connection_pool.remove_request", ] diff --git a/unasync.py b/unasync.py index d3607cd4..597a1112 100755 --- a/unasync.py +++ b/unasync.py @@ -27,6 +27,7 @@ ('@pytest.mark.anyio', ''), ('@pytest.mark.trio', ''), ('AutoBackend', 'SyncBackend'), + ('atrace', 'trace'), ] COMPILED_SUBS = [ (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)