diff --git a/playhouse/pool.py b/playhouse/pool.py index 62caece1a..d5a35eacf 100644 --- a/playhouse/pool.py +++ b/playhouse/pool.py @@ -34,7 +34,6 @@ class Meta: import functools import heapq import logging -import random import threading import time from collections import namedtuple @@ -68,6 +67,10 @@ class MaxConnectionsExceeded(ValueError): pass PoolConnection = namedtuple('PoolConnection', ('timestamp', 'connection', 'checked_out')) +class _sentinel(object): + def __lt__(self, other): + return True + def locked(fn): @functools.wraps(fn) @@ -136,7 +139,8 @@ def _connect(self): while True: try: # Remove the oldest connection from the heap. - ts, conn = heapq.heappop(self._connections) + ts, _, c_conn = heapq.heappop(self._connections) + conn = c_conn key = self.conn_key(conn) except IndexError: ts = conn = None @@ -167,7 +171,7 @@ def _connect(self): len(self._in_use) >= self._max_connections): raise MaxConnectionsExceeded('Exceeded maximum connections.') conn = super(PooledDatabase, self)._connect() - ts = time.time() - random.random() / 1000 + ts = time.time() key = self.conn_key(conn) logger.debug('Created new connection %s.', key) @@ -198,7 +202,8 @@ def _close(self, conn, close_conn=False): super(PooledDatabase, self)._close(conn) elif self._can_reuse(conn): logger.debug('Returning %s to pool.', key) - heapq.heappush(self._connections, (pool_conn.timestamp, conn)) + heapq.heappush(self._connections, + (pool_conn.timestamp, _sentinel(), conn)) else: logger.debug('Closed %s.', key) @@ -224,7 +229,7 @@ def manual_close(self): @locked def close_idle(self): # Close any open connections that are not currently in-use. - for _, conn in self._connections: + for _, _, conn in self._connections: self._close(conn, close_conn=True) self._connections = [] @@ -249,7 +254,7 @@ def close_all(self): # Close all connections -- available and in-use. Warning: may break any # active connections used by other threads. self.close() - for _, conn in self._connections: + for _, _, conn in self._connections: self._close(conn, close_conn=True) for pool_conn in self._in_use.values(): self._close(pool_conn.connection, close_conn=True) diff --git a/tests/pool.py b/tests/pool.py index 319afcdb0..cb5d16b20 100644 --- a/tests/pool.py +++ b/tests/pool.py @@ -112,7 +112,7 @@ def open_conn(): self.assertEqual(db.counter, 5) self.assertEqual( - sorted([conn for _, conn in db._connections]), + sorted([conn for _, _, conn in db._connections]), [1, 2, 3, 4, 5]) # All 5 are ready to be re-used. self.assertEqual(db._in_use, {}) @@ -184,9 +184,9 @@ def test_close_idle(self): db = FakePooledDatabase('testing', counter=3) now = time.time() - heapq.heappush(db._connections, (now - 10, 3)) - heapq.heappush(db._connections, (now - 5, 2)) - heapq.heappush(db._connections, (now - 1, 1)) + heapq.heappush(db._connections, (now - 10, None, 3)) + heapq.heappush(db._connections, (now - 5, None, 2)) + heapq.heappush(db._connections, (now - 1, None, 1)) self.assertEqual(db.connection(), 3) self.assertTrue(3 in db._in_use) @@ -218,9 +218,9 @@ def test_close_all(self): db = FakePooledDatabase('testing', counter=3) now = time.time() - heapq.heappush(db._connections, (now - 10, 3)) - heapq.heappush(db._connections, (now - 5, 2)) - heapq.heappush(db._connections, (now - 1, 1)) + heapq.heappush(db._connections, (now - 10, None, 3)) + heapq.heappush(db._connections, (now - 5, None, 2)) + heapq.heappush(db._connections, (now - 1, None, 1)) self.assertEqual(db.connection(), 3) self.assertTrue(3 in db._in_use) @@ -234,10 +234,10 @@ def test_stale_timeout_cascade(self): now = time.time() db = FakePooledDatabase('testing', stale_timeout=10) conns = [ - (now - 20, 1), - (now - 15, 2), - (now - 5, 3), - (now, 4), + (now - 20, None, 1), + (now - 15, None, 2), + (now - 5, None, 3), + (now, None, 4), ] for ts_conn in conns: heapq.heappush(db._connections, ts_conn) @@ -245,7 +245,7 @@ def test_stale_timeout_cascade(self): self.assertEqual(db.connection(), 3) self.assertEqual(len(db._in_use), 1) self.assertTrue(3 in db._in_use) - self.assertEqual(db._connections, [(now, 4)]) + self.assertEqual(db._connections, [(now, None, 4)]) def test_connect_cascade(self): now = time.time() @@ -256,10 +256,10 @@ def _is_closed(self, conn): db = ClosedPooledDatabase('testing', stale_timeout=10) conns = [ - (now - 15, 1), # Skipped due to being stale. - (now - 5, 2), # Will appear closed. - (now - 3, 3), - (now, 4), # Will appear closed. + (now - 15, None, 1), # Skipped due to being stale. + (now - 5, None, 2), # Will appear closed. + (now - 3, None, 3), + (now, None, 4), # Will appear closed. ] db.counter = 4 # The next connection we create will have id=5. for ts_conn in conns: @@ -272,7 +272,7 @@ def _is_closed(self, conn): pool_conn = db._in_use[3] self.assertEqual(pool_conn.timestamp, now - 3) self.assertEqual(pool_conn.connection, 3) - self.assertEqual(db._connections, [(now, 4)]) + self.assertEqual(db._connections, [(now, None, 4)]) # Since conn 4 is closed, we will open a new conn. db._state.closed = True # Pretend we're in a different thread.