From 4133181bb20f8d7e990994b2119c590f832a95f1 Mon Sep 17 00:00:00 2001 From: quininer Date: Wed, 24 Apr 2019 04:55:34 +0800 Subject: [PATCH] fix(client): fix a rare connection pool race condition It's possible for `PoolInner::put` to happen between `Pool::take` and `Pool::waiter`. This merges `take` and `waiter` into using the same lock. --- src/client/pool.rs | 112 +++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 64 deletions(-) diff --git a/src/client/pool.rs b/src/client/pool.rs index 57e4bf35ec..40bf01f718 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -198,41 +198,6 @@ impl Pool { .unwrap_or(0) } - fn take(&self, key: &Key) -> Option> { - let entry = { - let mut inner = self.inner.as_ref()?.lock().unwrap(); - let expiration = Expiration::new(inner.timeout); - let maybe_entry = inner.idle.get_mut(key) - .and_then(|list| { - trace!("take? {:?}: expiration = {:?}", key, expiration.0); - // A block to end the mutable borrow on list, - // so the map below can check is_empty() - { - let popper = IdlePopper { - key, - list, - }; - popper.pop(&expiration) - } - .map(|e| (e, list.is_empty())) - }); - - let (entry, empty) = if let Some((e, empty)) = maybe_entry { - (Some(e), empty) - } else { - // No entry found means nuke the list for sure. - (None, true) - }; - if empty { - //TODO: This could be done with the HashMap::entry API instead. - inner.idle.remove(key); - } - entry - }; - - entry.map(|e| self.reuse(key, e.value)) - } - pub(super) fn pooled(&self, mut connecting: Connecting, value: T) -> Pooled { let (value, pool_ref) = if let Some(ref enabled) = self.inner { match value.reserve() { @@ -296,23 +261,6 @@ impl Pool { value: Some(value), } } - - fn waiter(&self, key: Key, tx: oneshot::Sender) { - debug_assert!( - self.is_enabled(), - "pool.waiter() should not be called if disabled", - ); - trace!("checkout waiting for idle connection: {:?}", key); - self.inner - .as_ref() - .expect("pool.waiter() expects pool is enabled") - .lock() - .unwrap() - .waiters - .entry(key) - .or_insert(VecDeque::new()) - .push_back(tx); - } } /// Pop off this list, looking for a usable connection that hasn't expired. @@ -643,15 +591,54 @@ impl Checkout { } } - fn add_waiter(&mut self) { - debug_assert!(self.pool.is_enabled()); + fn checkout(&mut self) -> Option> { + let entry = { + let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); + let expiration = Expiration::new(inner.timeout); + let maybe_entry = inner.idle.get_mut(&self.key) + .and_then(|list| { + trace!("take? {:?}: expiration = {:?}", self.key, expiration.0); + // A block to end the mutable borrow on list, + // so the map below can check is_empty() + { + let popper = IdlePopper { + key: &self.key, + list, + }; + popper.pop(&expiration) + } + .map(|e| (e, list.is_empty())) + }); - if self.waiter.is_none() { - let (tx, mut rx) = oneshot::channel(); - let _ = rx.poll(); // park this task - self.pool.waiter(self.key.clone(), tx); - self.waiter = Some(rx); - } + let (entry, empty) = if let Some((e, empty)) = maybe_entry { + (Some(e), empty) + } else { + // No entry found means nuke the list for sure. + (None, true) + }; + if empty { + //TODO: This could be done with the HashMap::entry API instead. + inner.idle.remove(&self.key); + } + + if entry.is_none() && self.waiter.is_none() { + let (tx, mut rx) = oneshot::channel(); + let _ = rx.poll(); // park this task + + trace!("checkout waiting for idle connection: {:?}", self.key); + inner + .waiters + .entry(self.key.clone()) + .or_insert(VecDeque::new()) + .push_back(tx); + + self.waiter = Some(rx); + } + + entry + }; + + entry.map(|e| self.pool.reuse(&self.key, e.value)) } } @@ -664,14 +651,11 @@ impl Future for Checkout { return Ok(Async::Ready(pooled)); } - let entry = self.pool.take(&self.key); - - if let Some(pooled) = entry { + if let Some(pooled) = self.checkout() { Ok(Async::Ready(pooled)) } else if !self.pool.is_enabled() { Err(::Error::new_canceled(Some("pool is disabled"))) } else { - self.add_waiter(); Ok(Async::NotReady) } }