Skip to content

Commit 86c8a50

Browse files
committed
Fix leaky connections
Fixes #221 It's possible to trigger more approvals than are necessary, in turn grabbing more connections than we need. This happens when we drop a connection. The drop produces a notify, which doesn't get used until the pool is empty. The first `Pool::get()` call on an empty pool will spawn an connect task, immediately complete `notify.notified().await`, then spawn a second connect task. Both will connect and we'll end up with 1 more connection than we need. Rather than address the notify issue directly, this fix introduces some bookkeeping that tracks the number of open `pool.get()` requests we have waiting on connections. If the number of pending connections >= the number of pending gets, we will not spawn any additional connect tasks.
1 parent cb99697 commit 86c8a50

File tree

3 files changed

+66
-1
lines changed

3 files changed

+66
-1
lines changed

bb8/src/inner.rs

+1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ where
9191
let mut wait_time_start = None;
9292

9393
let future = async {
94+
let _guard = self.inner.request();
9495
loop {
9596
let (conn, approvals) = self.inner.pop();
9697
self.spawn_replenishing_approvals(approvals);

bb8/src/internals.rs

+31-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,27 @@ where
2222
pub(crate) statistics: AtomicStatistics,
2323
}
2424

25+
pub(crate) struct GetGuard<M: ManageConnection + Send> {
26+
inner: Arc<SharedPool<M>>,
27+
}
28+
29+
impl<M: ManageConnection + Send> GetGuard<M> {
30+
fn new(inner: Arc<SharedPool<M>>) -> Self {
31+
{
32+
let mut locked = inner.internals.lock();
33+
locked.inflight_gets += 1;
34+
}
35+
GetGuard { inner }
36+
}
37+
}
38+
39+
impl<M: ManageConnection + Send> Drop for GetGuard<M> {
40+
fn drop(&mut self) {
41+
let mut locked = self.inner.internals.lock();
42+
locked.inflight_gets -= 1;
43+
}
44+
}
45+
2546
impl<M> SharedPool<M>
2647
where
2748
M: ManageConnection + Send,
@@ -41,12 +62,19 @@ where
4162
let conn = locked.conns.pop_front().map(|idle| idle.conn);
4263
let approvals = match &conn {
4364
Some(_) => locked.wanted(&self.statics),
44-
None => locked.approvals(&self.statics, 1),
65+
None => {
66+
let approvals = min(1, locked.inflight_gets.saturating_sub(locked.pending_conns));
67+
locked.approvals(&self.statics, approvals)
68+
}
4569
};
4670

4771
(conn, approvals)
4872
}
4973

74+
pub(crate) fn request(self: &Arc<Self>) -> GetGuard<M> {
75+
GetGuard::new(self.clone())
76+
}
77+
5078
pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
5179
let mut locked = self.internals.lock();
5280
let mut approvals = locked.approvals(&self.statics, 1);
@@ -81,6 +109,7 @@ where
81109
conns: VecDeque<IdleConn<M::Connection>>,
82110
num_conns: u32,
83111
pending_conns: u32,
112+
inflight_gets: u32,
84113
}
85114

86115
impl<M> PoolInternals<M>
@@ -202,6 +231,7 @@ where
202231
conns: VecDeque::new(),
203232
num_conns: 0,
204233
pending_conns: 0,
234+
inflight_gets: 0,
205235
}
206236
}
207237
}

bb8/tests/test.rs

+34
Original file line numberDiff line numberDiff line change
@@ -1068,3 +1068,37 @@ async fn test_add_checks_broken_connections() {
10681068
let res = pool.add(conn);
10691069
assert!(matches!(res, Err(AddError::Broken(_))));
10701070
}
1071+
1072+
#[tokio::test]
1073+
async fn test_reuse_on_drop() {
1074+
let pool = Pool::builder()
1075+
.min_idle(0)
1076+
.max_size(100)
1077+
.queue_strategy(QueueStrategy::Lifo)
1078+
.build(OkManager::<FakeConnection>::new())
1079+
.await
1080+
.unwrap();
1081+
1082+
// The first get should
1083+
// 1) see nothing in the pool,
1084+
// 2) spawn a single replenishing approval,
1085+
// 3) get notified of the new connection and grab it from the pool
1086+
let conn_0 = pool.get().await.expect("should connect");
1087+
// Dropping the connection queues up a notify
1088+
drop(conn_0);
1089+
// The second get should
1090+
// 1) see the first connection in the pool and grab it
1091+
let _conn_1: PooledConnection<OkManager<FakeConnection>> =
1092+
pool.get().await.expect("should connect");
1093+
// The third get will
1094+
// 1) see nothing in the pool,
1095+
// 2) spawn a single replenishing approval,
1096+
// 3) get notified of the new connection,
1097+
// 4) see nothing in the pool,
1098+
// 5) _not_ spawn a single replenishing approval,
1099+
// 6) get notified of the new connection and grab it from the pool
1100+
let _conn_2: PooledConnection<OkManager<FakeConnection>> =
1101+
pool.get().await.expect("should connect");
1102+
1103+
assert_eq!(pool.state().connections, 2);
1104+
}

0 commit comments

Comments
 (0)