diff --git a/Cargo.toml b/Cargo.toml index 33c84f43de..7383325e94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ tokio-proto = { version = "0.1", optional = true } tokio-service = "0.1" tokio-io = "0.1" unicase = "2.0" +want = "0.0.4" [dev-dependencies] num_cpus = "1.0" diff --git a/src/client/conn.rs b/src/client/conn.rs index 5f44a11e3a..40acfff901 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -129,10 +129,6 @@ impl SendRequest pub(super) fn is_ready(&self) -> bool { self.dispatch.is_ready() } - - pub(super) fn is_closed(&self) -> bool { - self.dispatch.is_closed() - } } impl SendRequest diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index cc5e345710..cc13e6bc26 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,8 +1,8 @@ use futures::{Async, Poll, Stream}; use futures::sync::{mpsc, oneshot}; +use want; use common::Never; -use super::signal; //pub type Callback = oneshot::Sender)>>; pub type RetryPromise = oneshot::Receiver)>>; @@ -10,7 +10,7 @@ pub type Promise = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::channel(0); - let (giver, taker) = signal::new(); + let (giver, taker) = want::new(); let tx = Sender { giver: giver, inner: tx, @@ -27,7 +27,7 @@ pub struct Sender { // when the queue is empty. This helps us know when a request and // response have been fully processed, and a connection is ready // for more. - giver: signal::Giver, + giver: want::Giver, inner: mpsc::Sender<(T, Callback)>, } @@ -49,11 +49,8 @@ impl Sender { self.giver.is_wanting() } - pub fn is_closed(&self) -> bool { - self.giver.is_canceled() - } - pub fn try_send(&mut self, val: T) -> Result, T> { + self.giver.give(); let (tx, rx) = oneshot::channel(); self.inner.try_send((val, Callback::Retry(tx))) .map(move |_| rx) @@ -61,6 +58,7 @@ impl Sender { } pub fn send(&mut self, val: T) -> Result, T> { + self.giver.give(); let (tx, rx) = oneshot::channel(); self.inner.try_send((val, Callback::NoRetry(tx))) .map(move |_| rx) @@ -70,7 +68,7 @@ impl Sender { pub struct Receiver { inner: mpsc::Receiver<(T, Callback)>, - taker: signal::Taker, + taker: want::Taker, } impl Stream for Receiver { diff --git a/src/client/mod.rs b/src/client/mod.rs index 96a2b216bb..cc53924dc5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -37,7 +37,6 @@ mod dns; mod pool; #[cfg(feature = "compat")] pub mod compat; -mod signal; #[cfg(test)] mod tests; @@ -262,13 +261,22 @@ where C: Connect, // If the executor doesn't have room, oh well. Things will likely // be blowing up soon, but this specific task isn't required. let _ = executor.execute(future::poll_fn(move || { - pooled.tx.poll_ready().map_err(|_| ()) + pooled.tx.poll_ready() }).then(move |_| { // At this point, `pooled` is dropped, and had a chance // to insert into the pool (if conn was idle) drop(delayed_tx); Ok(()) })); + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready... + let _ = executor.execute( + future::poll_fn(move || { + pooled.tx.poll_ready() + }) + .then(|_| Ok(())) + ); } res @@ -395,8 +403,8 @@ impl self::pool::Closed for PoolClient where B: 'static, { - fn is_closed(&self) -> bool { - self.tx.is_closed() + fn is_open(&self) -> bool { + self.tx.is_ready() } } diff --git a/src/client/pool.rs b/src/client/pool.rs index 7531b77e44..fac134bdeb 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -18,7 +18,7 @@ pub struct Pool { // // See https://github.com/hyperium/hyper/issues/1429 pub trait Closed { - fn is_closed(&self) -> bool; + fn is_open(&self) -> bool; } struct PoolInner { @@ -77,7 +77,7 @@ impl Pool { trace!("take; url = {:?}, expiration = {:?}", key, expiration.0); while let Some(entry) = list.pop() { if !expiration.expires(entry.idle_at) { - if !entry.value.is_closed() { + if entry.value.is_open() { should_remove = list.is_empty(); return Some(entry); } @@ -205,7 +205,7 @@ impl PoolInner { self.idle.retain(|_key, values| { values.retain(|entry| { - if entry.value.is_closed() { + if !entry.value.is_open() { return false; } now - entry.idle_at < dur @@ -293,6 +293,11 @@ impl DerefMut for Pooled { impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { + if !value.is_open() { + // don't ever re-insert not-open connections back + // into the pool! + return; + } if let Some(inner) = self.pool.upgrade() { if let Ok(mut inner) = inner.lock() { inner.put(self.key.clone(), value); @@ -331,7 +336,7 @@ impl Checkout { if let Some(ref mut rx) = self.parked { match rx.poll() { Ok(Async::Ready(value)) => { - if !value.is_closed() { + if value.is_open() { return Ok(Async::Ready(self.pool.reuse(&self.key, value))); } drop_parked = true; @@ -434,8 +439,8 @@ mod tests { use super::{Closed, Pool}; impl Closed for i32 { - fn is_closed(&self) -> bool { - false + fn is_open(&self) -> bool { + true } } diff --git a/src/client/signal.rs b/src/client/signal.rs deleted file mode 100644 index 7fd6bb4490..0000000000 --- a/src/client/signal.rs +++ /dev/null @@ -1,192 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use futures::{Async, Poll}; -use futures::task::{self, Task}; - -use self::lock::Lock; - -pub fn new() -> (Giver, Taker) { - let inner = Arc::new(Inner { - state: AtomicUsize::new(STATE_IDLE), - task: Lock::new(None), - }); - let inner2 = inner.clone(); - ( - Giver { - inner: inner, - }, - Taker { - inner: inner2, - }, - ) -} - -#[derive(Clone)] -pub struct Giver { - inner: Arc, -} - -pub struct Taker { - inner: Arc, -} - -const STATE_IDLE: usize = 0; -const STATE_WANT: usize = 1; -const STATE_GIVE: usize = 2; -const STATE_CLOSED: usize = 3; - -struct Inner { - state: AtomicUsize, - task: Lock>, -} - -impl Giver { - pub fn poll_want(&mut self) -> Poll<(), ()> { - loop { - let state = self.inner.state.load(Ordering::SeqCst); - match state { - STATE_WANT => { - // only set to IDLE if it is still Want - self.inner.state.compare_and_swap( - STATE_WANT, - STATE_IDLE, - Ordering::SeqCst, - ); - return Ok(Async::Ready(())) - }, - STATE_GIVE => { - // we're already waiting, return - return Ok(Async::NotReady) - } - STATE_CLOSED => return Err(()), - // Taker doesn't want anything yet, so park. - _ => { - if let Some(mut locked) = self.inner.task.try_lock() { - - // While we have the lock, try to set to GIVE. - let old = self.inner.state.compare_and_swap( - STATE_IDLE, - STATE_GIVE, - Ordering::SeqCst, - ); - // If it's not still IDLE, something happened! - // Go around the loop again. - if old == STATE_IDLE { - *locked = Some(task::current()); - return Ok(Async::NotReady) - } - } else { - // if we couldn't take the lock, then a Taker has it. - // The *ONLY* reason is because it is in the process of notifying us - // of its want. - // - // We need to loop again to see what state it was changed to. - } - }, - } - } - } - - pub fn is_wanting(&self) -> bool { - self.inner.state.load(Ordering::SeqCst) == STATE_WANT - } - - pub fn is_canceled(&self) -> bool { - self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED - } -} - -impl Taker { - pub fn cancel(&self) { - self.signal(STATE_CLOSED) - } - - pub fn want(&self) { - self.signal(STATE_WANT) - } - - fn signal(&self, state: usize) { - let old_state = self.inner.state.swap(state, Ordering::SeqCst); - match old_state { - STATE_WANT | STATE_CLOSED | STATE_IDLE => (), - _ => { - loop { - if let Some(mut locked) = self.inner.task.try_lock() { - if let Some(task) = locked.take() { - task.notify(); - } - return; - } else { - // if we couldn't take the lock, then a Giver has it. - // The *ONLY* reason is because it is in the process of parking. - // - // We need to loop and take the lock so we can notify this task. - } - } - }, - } - } -} - -impl Drop for Taker { - fn drop(&mut self) { - self.cancel(); - } -} - - -// a sub module just to protect unsafety -mod lock { - use std::cell::UnsafeCell; - use std::ops::{Deref, DerefMut}; - use std::sync::atomic::{AtomicBool, Ordering}; - - pub struct Lock { - is_locked: AtomicBool, - value: UnsafeCell, - } - - impl Lock { - pub fn new(val: T) -> Lock { - Lock { - is_locked: AtomicBool::new(false), - value: UnsafeCell::new(val), - } - } - - pub fn try_lock(&self) -> Option> { - if !self.is_locked.swap(true, Ordering::SeqCst) { - Some(Locked { lock: self }) - } else { - None - } - } - } - - unsafe impl Send for Lock {} - unsafe impl Sync for Lock {} - - pub struct Locked<'a, T: 'a> { - lock: &'a Lock, - } - - impl<'a, T> Deref for Locked<'a, T> { - type Target = T; - fn deref(&self) -> &T { - unsafe { &*self.lock.value.get() } - } - } - - impl<'a, T> DerefMut for Locked<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.lock.value.get() } - } - } - - impl<'a, T> Drop for Locked<'a, T> { - fn drop(&mut self) { - self.lock.is_locked.store(false, Ordering::SeqCst); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index a412fdc52c..9ddac9534d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ extern crate tokio_core as tokio; extern crate tokio_proto; extern crate tokio_service; extern crate unicase; +extern crate want; #[cfg(all(test, feature = "nightly"))] extern crate test;