Skip to content

Commit

Permalink
Better fix for #2901 using a no-op comparable in the heap.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed May 22, 2024
1 parent 8182d2a commit f18e3a0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
17 changes: 11 additions & 6 deletions playhouse/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class Meta:
import functools
import heapq
import logging
import random
import threading
import time
from collections import namedtuple
Expand Down Expand Up @@ -68,6 +67,10 @@ class MaxConnectionsExceeded(ValueError): pass
PoolConnection = namedtuple('PoolConnection', ('timestamp', 'connection',
'checked_out'))

class _sentinel(object):
def __lt__(self, other):
return True


def locked(fn):
@functools.wraps(fn)
Expand Down Expand Up @@ -136,7 +139,8 @@ def _connect(self):
while True:
try:
# Remove the oldest connection from the heap.
ts, conn = heapq.heappop(self._connections)
ts, _, c_conn = heapq.heappop(self._connections)
conn = c_conn
key = self.conn_key(conn)
except IndexError:
ts = conn = None
Expand Down Expand Up @@ -167,7 +171,7 @@ def _connect(self):
len(self._in_use) >= self._max_connections):
raise MaxConnectionsExceeded('Exceeded maximum connections.')
conn = super(PooledDatabase, self)._connect()
ts = time.time() - random.random() / 1000
ts = time.time()
key = self.conn_key(conn)
logger.debug('Created new connection %s.', key)

Expand Down Expand Up @@ -198,7 +202,8 @@ def _close(self, conn, close_conn=False):
super(PooledDatabase, self)._close(conn)
elif self._can_reuse(conn):
logger.debug('Returning %s to pool.', key)
heapq.heappush(self._connections, (pool_conn.timestamp, conn))
heapq.heappush(self._connections,
(pool_conn.timestamp, _sentinel(), conn))
else:
logger.debug('Closed %s.', key)

Expand All @@ -224,7 +229,7 @@ def manual_close(self):
@locked
def close_idle(self):
# Close any open connections that are not currently in-use.
for _, conn in self._connections:
for _, _, conn in self._connections:
self._close(conn, close_conn=True)
self._connections = []

Expand All @@ -249,7 +254,7 @@ def close_all(self):
# Close all connections -- available and in-use. Warning: may break any
# active connections used by other threads.
self.close()
for _, conn in self._connections:
for _, _, conn in self._connections:
self._close(conn, close_conn=True)
for pool_conn in self._in_use.values():
self._close(pool_conn.connection, close_conn=True)
Expand Down
34 changes: 17 additions & 17 deletions tests/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def open_conn():

self.assertEqual(db.counter, 5)
self.assertEqual(
sorted([conn for _, conn in db._connections]),
sorted([conn for _, _, conn in db._connections]),
[1, 2, 3, 4, 5]) # All 5 are ready to be re-used.
self.assertEqual(db._in_use, {})

Expand Down Expand Up @@ -184,9 +184,9 @@ def test_close_idle(self):
db = FakePooledDatabase('testing', counter=3)

now = time.time()
heapq.heappush(db._connections, (now - 10, 3))
heapq.heappush(db._connections, (now - 5, 2))
heapq.heappush(db._connections, (now - 1, 1))
heapq.heappush(db._connections, (now - 10, None, 3))
heapq.heappush(db._connections, (now - 5, None, 2))
heapq.heappush(db._connections, (now - 1, None, 1))

self.assertEqual(db.connection(), 3)
self.assertTrue(3 in db._in_use)
Expand Down Expand Up @@ -218,9 +218,9 @@ def test_close_all(self):
db = FakePooledDatabase('testing', counter=3)

now = time.time()
heapq.heappush(db._connections, (now - 10, 3))
heapq.heappush(db._connections, (now - 5, 2))
heapq.heappush(db._connections, (now - 1, 1))
heapq.heappush(db._connections, (now - 10, None, 3))
heapq.heappush(db._connections, (now - 5, None, 2))
heapq.heappush(db._connections, (now - 1, None, 1))
self.assertEqual(db.connection(), 3)
self.assertTrue(3 in db._in_use)

Expand All @@ -234,18 +234,18 @@ def test_stale_timeout_cascade(self):
now = time.time()
db = FakePooledDatabase('testing', stale_timeout=10)
conns = [
(now - 20, 1),
(now - 15, 2),
(now - 5, 3),
(now, 4),
(now - 20, None, 1),
(now - 15, None, 2),
(now - 5, None, 3),
(now, None, 4),
]
for ts_conn in conns:
heapq.heappush(db._connections, ts_conn)

self.assertEqual(db.connection(), 3)
self.assertEqual(len(db._in_use), 1)
self.assertTrue(3 in db._in_use)
self.assertEqual(db._connections, [(now, 4)])
self.assertEqual(db._connections, [(now, None, 4)])

def test_connect_cascade(self):
now = time.time()
Expand All @@ -256,10 +256,10 @@ def _is_closed(self, conn):
db = ClosedPooledDatabase('testing', stale_timeout=10)

conns = [
(now - 15, 1), # Skipped due to being stale.
(now - 5, 2), # Will appear closed.
(now - 3, 3),
(now, 4), # Will appear closed.
(now - 15, None, 1), # Skipped due to being stale.
(now - 5, None, 2), # Will appear closed.
(now - 3, None, 3),
(now, None, 4), # Will appear closed.
]
db.counter = 4 # The next connection we create will have id=5.
for ts_conn in conns:
Expand All @@ -272,7 +272,7 @@ def _is_closed(self, conn):
pool_conn = db._in_use[3]
self.assertEqual(pool_conn.timestamp, now - 3)
self.assertEqual(pool_conn.connection, 3)
self.assertEqual(db._connections, [(now, 4)])
self.assertEqual(db._connections, [(now, None, 4)])

# Since conn 4 is closed, we will open a new conn.
db._state.closed = True # Pretend we're in a different thread.
Expand Down

0 comments on commit f18e3a0

Please sign in to comment.