From d148b754fbfc2922fa0ff5d75bb877469f609839 Mon Sep 17 00:00:00 2001 From: Andrew Geng Date: Fri, 7 Jul 2023 11:49:12 -0400 Subject: [PATCH 1/4] Cut BaseProtocol circular reference on close. A bound method contains a reference to the instance it's bound to. Most of the time, bound methods are created lazily at access time by the descriptor protocol and discarded after calling. But saving a bound method as another attribute on the instance creates a long-lived cycle, here `.timeout_callback.__self__`, that needs to be explicitly broken if we don't want to wake up python's garbage collector to do it. Without this change, the new assertion in the tests would fail, and `pytest --pdb` would show the bound methods `_on_timeout` and `_on_waiter_completed` at the end of `p gc.get_referrers(protoref())`. --- asyncpg/protocol/protocol.pyx | 2 ++ tests/test_connect.py | 30 ++++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index 1f739cc2..21f7e93e 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -614,6 +614,7 @@ cdef class BaseProtocol(CoreProtocol): # Ask the server to terminate the connection and wait for it # to drop. self.waiter = self._new_waiter(timeout) + self.timeout_callback = self.completed_callback = None self._terminate() try: await self.waiter @@ -682,6 +683,7 @@ cdef class BaseProtocol(CoreProtocol): exc.__cause__ = cause self.waiter.set_exception(exc) self.waiter = None + self.timeout_callback = self.completed_callback = None cdef _set_server_parameter(self, name, val): self.settings.add_setting(name, val) diff --git a/tests/test_connect.py b/tests/test_connect.py index f61db61a..4bd1d4f0 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -7,6 +7,7 @@ import asyncio import contextlib +import gc import ipaddress import os import pathlib @@ -1846,14 +1847,27 @@ async def worker(): class TestConnectionGC(tb.ClusterTestCase): async def _run_no_explicit_close_test(self): - con = await self.connect() - await con.fetchval("select 123") - proto = con._protocol - conref = weakref.ref(con) - del con - - self.assertIsNone(conref()) - self.assertTrue(proto.is_closed()) + gc_was_enabled = gc.isenabled() + gc.disable() + try: + con = await self.connect() + await con.fetchval("select 123") + proto = con._protocol + conref = weakref.ref(con) + del con + + self.assertIsNone(conref()) + self.assertTrue(proto.is_closed()) + + # tick event loop so asyncio.selector_events._SelectorSocketTransport + # has a chance to close itself and remove its reference to proto + await asyncio.sleep(0) + protoref = weakref.ref(proto) + del proto + self.assertIsNone(protoref()) + finally: + if gc_was_enabled: + gc.enable() async def test_no_explicit_close_no_debug(self): olddebug = self.loop.get_debug() From 6a0296238048c5ec2efcd3e5205c3138a1cc596b Mon Sep 17 00:00:00 2001 From: pteromys Date: Sat, 8 Jul 2023 00:16:24 -0400 Subject: [PATCH 2/4] Just remove the callbacks instead. --- asyncpg/protocol/protocol.pxd | 2 -- asyncpg/protocol/protocol.pyx | 8 ++------ 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/asyncpg/protocol/protocol.pxd b/asyncpg/protocol/protocol.pxd index 5f144e55..a9ac8d5f 100644 --- a/asyncpg/protocol/protocol.pxd +++ b/asyncpg/protocol/protocol.pxd @@ -39,8 +39,6 @@ cdef class BaseProtocol(CoreProtocol): bint return_extra object create_future object timeout_handle - object timeout_callback - object completed_callback object conref type record_class bint is_reading diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index 21f7e93e..24511c01 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -98,8 +98,6 @@ cdef class BaseProtocol(CoreProtocol): self.writing_allowed.set() self.timeout_handle = None - self.timeout_callback = self._on_timeout - self.completed_callback = self._on_waiter_completed self.queries_count = 0 @@ -614,7 +612,6 @@ cdef class BaseProtocol(CoreProtocol): # Ask the server to terminate the connection and wait for it # to drop. self.waiter = self._new_waiter(timeout) - self.timeout_callback = self.completed_callback = None self._terminate() try: await self.waiter @@ -683,7 +680,6 @@ cdef class BaseProtocol(CoreProtocol): exc.__cause__ = cause self.waiter.set_exception(exc) self.waiter = None - self.timeout_callback = self.completed_callback = None cdef _set_server_parameter(self, name, val): self.settings.add_setting(name, val) @@ -756,8 +752,8 @@ cdef class BaseProtocol(CoreProtocol): self.waiter = self.create_future() if timeout is not None: self.timeout_handle = self.loop.call_later( - timeout, self.timeout_callback, self.waiter) - self.waiter.add_done_callback(self.completed_callback) + timeout, self._on_timeout, self.waiter) + self.waiter.add_done_callback(self._on_waiter_completed) return self.waiter cdef _on_result__connect(self, object waiter): From b8e68333715c95cddb46884010179621178a6730 Mon Sep 17 00:00:00 2001 From: pteromys Date: Fri, 14 Jul 2023 14:38:43 -0400 Subject: [PATCH 3/4] Fix line length for flake8. --- tests/test_connect.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_connect.py b/tests/test_connect.py index 4bd1d4f0..1af074f1 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -1859,8 +1859,8 @@ async def _run_no_explicit_close_test(self): self.assertIsNone(conref()) self.assertTrue(proto.is_closed()) - # tick event loop so asyncio.selector_events._SelectorSocketTransport - # has a chance to close itself and remove its reference to proto + # tick event loop; asyncio.selector_events._SelectorSocketTransport + # needs a chance to close itself and remove its reference to proto await asyncio.sleep(0) protoref = weakref.ref(proto) del proto From ddb4505a5b5dc8cb94a5763fbf9bc5f711a455f9 Mon Sep 17 00:00:00 2001 From: Elvis Pranskevichus Date: Mon, 9 Oct 2023 12:25:23 -0700 Subject: [PATCH 4/4] Unset `transport` in `Protocol.abort()` --- asyncpg/protocol/protocol.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/asyncpg/protocol/protocol.pyx b/asyncpg/protocol/protocol.pyx index 24511c01..487259eb 100644 --- a/asyncpg/protocol/protocol.pyx +++ b/asyncpg/protocol/protocol.pyx @@ -582,6 +582,7 @@ cdef class BaseProtocol(CoreProtocol): self._handle_waiter_on_connection_lost(None) self._terminate() self.transport.abort() + self.transport = None @cython.iterable_coroutine async def close(self, timeout):