Skip to content

Commit 80d0a73

Browse files
committed
Fix djc#167: Notify waiters when dropping a bad connection from the pool
1 parent 8de2610 commit 80d0a73

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

bb8/src/inner.rs

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ where
141141
(_, _) => {
142142
let approvals = locked.dropped(1, &self.inner.statics);
143143
self.spawn_replenishing_approvals(approvals);
144+
self.inner.notify.notify_waiters();
144145
}
145146
}
146147
}

bb8/tests/test.rs

+58
Original file line numberDiff line numberDiff line change
@@ -831,3 +831,61 @@ async fn test_customize_connection_acquire() {
831831
let connection_1_or_2 = pool.get().await.unwrap();
832832
assert!(connection_1_or_2.custom_field == 1 || connection_1_or_2.custom_field == 2);
833833
}
834+
835+
#[tokio::test]
836+
async fn test_broken_connections_dont_starve_pool() {
837+
use std::sync::RwLock;
838+
use std::{convert::Infallible, time::Duration};
839+
840+
#[derive(Default)]
841+
struct ConnectionManager {
842+
counter: RwLock<u16>,
843+
}
844+
#[derive(Debug)]
845+
struct Connection;
846+
847+
#[async_trait::async_trait]
848+
impl bb8::ManageConnection for ConnectionManager {
849+
type Connection = Connection;
850+
type Error = Infallible;
851+
852+
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
853+
Ok(Connection)
854+
}
855+
856+
async fn is_valid(&self, _: &mut Self::Connection) -> Result<(), Self::Error> {
857+
Ok(())
858+
}
859+
860+
fn has_broken(&self, _: &mut Self::Connection) -> bool {
861+
let res = if *self.counter.read().unwrap() < 5 {
862+
true
863+
} else {
864+
false
865+
};
866+
*self.counter.write().unwrap() += 1;
867+
res
868+
}
869+
}
870+
871+
let pool = bb8::Pool::builder()
872+
.max_size(5)
873+
.connection_timeout(Duration::from_secs(10))
874+
.build(ConnectionManager::default())
875+
.await
876+
.unwrap();
877+
878+
let mut futures = Vec::new();
879+
880+
for i in 0..10 {
881+
let pool = pool.clone();
882+
futures.push(tokio::spawn(async move {
883+
let conn = pool.get().await.unwrap();
884+
drop(conn);
885+
}));
886+
}
887+
888+
for future in futures {
889+
future.await.unwrap();
890+
}
891+
}

0 commit comments

Comments
 (0)