Skip to content

Commit

Permalink
fix(client): check conn is closed in expire interval
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Feb 28, 2018
1 parent 4aab54e commit 2fa0c84
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 24 deletions.
10 changes: 3 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,9 @@ impl<B> Clone for HyperClient<B> {
}
}

impl<B> self::pool::Ready for HyperClient<B> {
fn poll_ready(&mut self) -> Poll<(), ()> {
if self.tx.is_closed() {
Err(())
} else {
Ok(Async::Ready(()))
}
impl<B> self::pool::Closed for HyperClient<B> {
fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}

Expand Down
39 changes: 22 additions & 17 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub struct Pool<T> {
// This is a trait to allow the `client::pool::tests` to work for `i32`.
//
// See https://github.com/hyperium/hyper/issues/1429
pub trait Ready {
fn poll_ready(&mut self) -> Poll<(), ()>;
pub trait Closed {
fn is_closed(&self) -> bool;
}

struct PoolInner<T> {
Expand All @@ -49,7 +49,7 @@ struct PoolInner<T> {
expired_timer_spawned: bool,
}

impl<T: Clone + Ready> Pool<T> {
impl<T: Clone + Closed> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Rc::new(RefCell::new(PoolInner {
Expand Down Expand Up @@ -117,10 +117,10 @@ impl<T: Clone + Ready> Pool<T> {
let mut should_remove = false;
let entry = inner.idle.get_mut(key).and_then(|list| {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(mut entry) = list.pop() {
while let Some(entry) = list.pop() {
match entry.status.get() {
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
if !entry.value.is_closed() {
should_remove = list.is_empty();
return Some(entry);
}
Expand Down Expand Up @@ -202,7 +202,9 @@ impl<T> Pool<T> {
inner.parked.remove(key);
}
}
}

impl<T: Closed> Pool<T> {
fn clear_expired(&self) {
let mut inner = self.inner.borrow_mut();

Expand All @@ -218,6 +220,9 @@ impl<T> Pool<T> {
inner.idle.retain(|_key, values| {

values.retain(|val| {
if val.value.is_closed() {
return false;
}
match val.status.get() {
TimedKA::Idle(idle_at) if now - idle_at < dur => {
true
Expand All @@ -234,7 +239,7 @@ impl<T> Pool<T> {
}


impl<T: 'static> Pool<T> {
impl<T: Closed + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
let mut inner = self.inner.borrow_mut();

Expand Down Expand Up @@ -296,7 +301,7 @@ impl<T> DerefMut for Pooled<T> {
}
}

impl<T: Clone + Ready> KeepAlive for Pooled<T> {
impl<T: Clone + Closed> KeepAlive for Pooled<T> {
fn busy(&mut self) {
self.entry.status.set(TimedKA::Busy);
}
Expand Down Expand Up @@ -347,7 +352,7 @@ impl<T> fmt::Debug for Pooled<T> {
}
}

impl<T: Clone + Ready> BitAndAssign<bool> for Pooled<T> {
impl<T: Clone + Closed> BitAndAssign<bool> for Pooled<T> {
fn bitand_assign(&mut self, enabled: bool) {
if !enabled {
self.disable();
Expand Down Expand Up @@ -377,13 +382,13 @@ pub struct Checkout<T> {

struct NotParked;

impl<T: Clone + Ready> Checkout<T> {
impl<T: Clone + Closed> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
if let Some(ref mut rx) = self.parked {
match rx.poll() {
Ok(Async::Ready(mut entry)) => {
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
if !entry.value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, entry)));
}
drop_parked = true;
Expand All @@ -408,7 +413,7 @@ impl<T: Clone + Ready> Checkout<T> {
}
}

impl<T: Clone + Ready> Future for Checkout<T> {
impl<T: Clone + Closed> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = io::Error;

Expand Down Expand Up @@ -456,7 +461,7 @@ struct IdleInterval<T> {
pool: Weak<RefCell<PoolInner<T>>>,
}

impl<T: 'static> Future for IdleInterval<T> {
impl<T: Closed + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();

Expand All @@ -478,14 +483,14 @@ impl<T: 'static> Future for IdleInterval<T> {
mod tests {
use std::rc::Rc;
use std::time::Duration;
use futures::{Async, Future, Poll};
use futures::{Async, Future};
use futures::future;
use proto::KeepAlive;
use super::{Ready, Pool};
use super::{Closed, Pool};

impl Ready for i32 {
fn poll_ready(&mut self) -> Poll<(), ()> {
Ok(Async::Ready(()))
impl Closed for i32 {
fn is_closed(&self) -> bool {
false
}
}

Expand Down

0 comments on commit 2fa0c84

Please sign in to comment.