Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Feb 28, 2018
1 parent ad77630 commit 1223fc2
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 4 deletions.
21 changes: 19 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,33 @@ impl<C, B> Client<C, B> {
Exec::Executor(..) => panic!("Client not built with a Handle"),
}
}
}


/// Create a new client with a specific connector.
impl<C, B> Client<C, B>
where C: Connect,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
// Create a new client with a specific connector.
#[inline]
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
Client {
let client = Client {
connector: Rc::new(config.connector),
executor: exec,
h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
retry_canceled_requests: config.retry_canceled_requests,
};

client.schedule_pool_timer();

client
}

fn schedule_pool_timer(&self) {
if let Exec::Handle(ref h) = self.executor {
self.pool.spawn_expired_interval(h);
}
}
}
Expand Down
99 changes: 97 additions & 2 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign};
use std::rc::{Rc, Weak};
use std::time::{Duration, Instant};

use futures::{Future, Async, Poll};
use futures::{Future, Async, Poll, Stream};
use tokio::reactor::{Handle, Interval};
use relay;

use proto::{KeepAlive, KA};
Expand Down Expand Up @@ -194,6 +195,59 @@ impl<T> Pool<T> {
inner.parked.remove(key);
}
}

fn clear_expired(&self) {
let mut inner = self.inner.borrow_mut();

let dur = if let Some(dur) = inner.timeout {
dur
} else {
return
};

let now = Instant::now();
//self.last_idle_check_at = now;

inner.idle.retain(|_key, values| {

values.retain(|val| {
match val.status.get() {
TimedKA::Idle(idle_at) if now - idle_at < dur => {
true
},
_ => false,
}
//now - val.idle_at < dur
});

// returning false evicts this key/val
!values.is_empty()
});
}
}


impl<T: 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
let inner = self.inner.borrow();

if !inner.enabled {
return;
}

let dur = if let Some(dur) = inner.timeout {
dur
} else {
return
};

let interval = Interval::new(dur, handle)
.expect("reactor is gone");
handle.spawn(IdleInterval {
interval: interval,
pool: Rc::downgrade(&self.inner),
});
}
}

impl<T> Clone for Pool<T> {
Expand Down Expand Up @@ -385,6 +439,28 @@ impl Expiration {
}
}

struct IdleInterval<T> {
interval: Interval,
pool: Weak<RefCell<PoolInner<T>>>,
}

impl<T: 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));

if let Some(inner) = self.pool.upgrade() {
let pool = Pool { inner: inner };
pool.clear_expired();
} else {
return Ok(Async::Ready(()));
}
}
}
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -428,7 +504,7 @@ mod tests {
}

#[test]
fn test_pool_removes_expired() {
fn test_pool_checkout_removes_expired() {
let pool = Pool::new(true, Some(Duration::from_secs(1)));
let key = Rc::new("foo".to_string());

Expand All @@ -451,6 +527,25 @@ mod tests {
assert!(pool.inner.borrow().idle.get(&key).is_none());
}

#[test]
fn test_pool_timer_removes_expired() {
let pool = Pool::new(true, Some(Duration::from_secs(1)));
let key = Rc::new("foo".to_string());

let mut pooled1 = pool.pooled(key.clone(), 41);
pooled1.idle();
let mut pooled2 = pool.pooled(key.clone(), 5);
pooled2.idle();
let mut pooled3 = pool.pooled(key.clone(), 99);
pooled3.idle();

assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.borrow().timeout.unwrap());

pool.clear_expired();
assert!(pool.inner.borrow().idle.get(&key).is_none());
}

#[test]
fn test_pool_checkout_task_unparked() {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
Expand Down

0 comments on commit 1223fc2

Please sign in to comment.