Skip to content

Commit

Permalink
fix(client): fix a rare connection pool race condition
Browse files Browse the repository at this point in the history
It's possible for `PoolInner::put` to happen between `Pool::take` and `Pool::waiter`. This merges `take` and `waiter` into using the same lock.
  • Loading branch information
quininer authored and seanmonstar committed Apr 23, 2019
1 parent 0c1e182 commit 4133181
Showing 1 changed file with 48 additions and 64 deletions.
112 changes: 48 additions & 64 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,41 +198,6 @@ impl<T: Poolable> Pool<T> {
.unwrap_or(0)
}

fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let maybe_entry = inner.idle.get_mut(key)
.and_then(|list| {
trace!("take? {:?}: expiration = {:?}", key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key,
list,
};
popper.pop(&expiration)
}
.map(|e| (e, list.is_empty()))
});

let (entry, empty) = if let Some((e, empty)) = maybe_entry {
(Some(e), empty)
} else {
// No entry found means nuke the list for sure.
(None, true)
};
if empty {
//TODO: This could be done with the HashMap::entry API instead.
inner.idle.remove(key);
}
entry
};

entry.map(|e| self.reuse(key, e.value))
}

pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
match value.reserve() {
Expand Down Expand Up @@ -296,23 +261,6 @@ impl<T: Poolable> Pool<T> {
value: Some(value),
}
}

fn waiter(&self, key: Key, tx: oneshot::Sender<T>) {
debug_assert!(
self.is_enabled(),
"pool.waiter() should not be called if disabled",
);
trace!("checkout waiting for idle connection: {:?}", key);
self.inner
.as_ref()
.expect("pool.waiter() expects pool is enabled")
.lock()
.unwrap()
.waiters
.entry(key)
.or_insert(VecDeque::new())
.push_back(tx);
}
}

/// Pop off this list, looking for a usable connection that hasn't expired.
Expand Down Expand Up @@ -643,15 +591,54 @@ impl<T: Poolable> Checkout<T> {
}
}

fn add_waiter(&mut self) {
debug_assert!(self.pool.is_enabled());
fn checkout(&mut self) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let maybe_entry = inner.idle.get_mut(&self.key)
.and_then(|list| {
trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key: &self.key,
list,
};
popper.pop(&expiration)
}
.map(|e| (e, list.is_empty()))
});

if self.waiter.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task
self.pool.waiter(self.key.clone(), tx);
self.waiter = Some(rx);
}
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
(Some(e), empty)
} else {
// No entry found means nuke the list for sure.
(None, true)
};
if empty {
//TODO: This could be done with the HashMap::entry API instead.
inner.idle.remove(&self.key);
}

if entry.is_none() && self.waiter.is_none() {
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task

trace!("checkout waiting for idle connection: {:?}", self.key);
inner
.waiters
.entry(self.key.clone())
.or_insert(VecDeque::new())
.push_back(tx);

self.waiter = Some(rx);
}

entry
};

entry.map(|e| self.pool.reuse(&self.key, e.value))
}
}

Expand All @@ -664,14 +651,11 @@ impl<T: Poolable> Future for Checkout<T> {
return Ok(Async::Ready(pooled));
}

let entry = self.pool.take(&self.key);

if let Some(pooled) = entry {
if let Some(pooled) = self.checkout() {
Ok(Async::Ready(pooled))
} else if !self.pool.is_enabled() {
Err(::Error::new_canceled(Some("pool is disabled")))
} else {
self.add_waiter();
Ok(Async::NotReady)
}
}
Expand Down

0 comments on commit 4133181

Please sign in to comment.