-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This adds a connection pool to the Client that is used by default. It accepts any other NetworkConnector, and simply acts as a NetworkConnector itself. Other Pools can exist by simply providing a custom NetworkConnector. This Pool is only used by default if you also use the default connector, which is `HttpConnector`. If you wish to use the Pool with a custom connector, you'll need to create the Pool with your custom connector, and then pass that pool to the Client::with_connector. This also adds a method to `NetworkStream`, `close`, which can be used to know when the Stream should be put down, because a server requested that the connection close instead of be kept alive. Closes #363 Closes #41
- Loading branch information
1 parent
9d83ed6
commit e9c7133
Showing
7 changed files
with
281 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
//! Client Connection Pooling | ||
use std::borrow::ToOwned; | ||
use std::collections::HashMap; | ||
use std::io::{self, Read, Write}; | ||
use std::net::{SocketAddr, Shutdown}; | ||
use std::sync::{Arc, Mutex}; | ||
|
||
use net::{NetworkConnector, NetworkStream, HttpConnector}; | ||
|
||
/// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`. | ||
pub struct Pool<C: NetworkConnector> { | ||
connector: C, | ||
inner: Arc<Mutex<PoolImpl<<C as NetworkConnector>::Stream>>> | ||
} | ||
|
||
/// Config options for the `Pool`. | ||
#[derive(Debug)] | ||
pub struct Config { | ||
/// The maximum idle connections *per host*. | ||
pub max_idle: usize, | ||
} | ||
|
||
impl Default for Config { | ||
#[inline] | ||
fn default() -> Config { | ||
Config { | ||
max_idle: 5, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
struct PoolImpl<S> { | ||
conns: HashMap<Key, Vec<S>>, | ||
config: Config, | ||
} | ||
|
||
type Key = (String, u16, Scheme); | ||
|
||
fn key<T: Into<Scheme>>(host: &str, port: u16, scheme: T) -> Key { | ||
(host.to_owned(), port, scheme.into()) | ||
} | ||
|
||
#[derive(Clone, PartialEq, Eq, Debug, Hash)] | ||
enum Scheme { | ||
Http, | ||
Https, | ||
Other(String) | ||
} | ||
|
||
impl<'a> From<&'a str> for Scheme { | ||
fn from(s: &'a str) -> Scheme { | ||
match s { | ||
"http" => Scheme::Http, | ||
"https" => Scheme::Https, | ||
s => Scheme::Other(String::from(s)) | ||
} | ||
} | ||
} | ||
|
||
impl Pool<HttpConnector> { | ||
/// Creates a `Pool` with an `HttpConnector`. | ||
#[inline] | ||
pub fn new(config: Config) -> Pool<HttpConnector> { | ||
Pool::with_connector(config, HttpConnector(None)) | ||
} | ||
} | ||
|
||
impl<C: NetworkConnector> Pool<C> { | ||
/// Creates a `Pool` with a specified `NetworkConnector`. | ||
#[inline] | ||
pub fn with_connector(config: Config, connector: C) -> Pool<C> { | ||
Pool { | ||
connector: connector, | ||
inner: Arc::new(Mutex::new(PoolImpl { | ||
conns: HashMap::new(), | ||
config: config, | ||
})) | ||
} | ||
} | ||
|
||
/// Clear all idle connections from the Pool, closing them. | ||
#[inline] | ||
pub fn clear_idle(&mut self) { | ||
self.inner.lock().unwrap().conns.clear(); | ||
} | ||
} | ||
|
||
impl<S> PoolImpl<S> { | ||
fn reuse(&mut self, key: Key, conn: S) { | ||
trace!("reuse {:?}", key); | ||
let conns = self.conns.entry(key).or_insert(vec![]); | ||
if conns.len() < self.config.max_idle { | ||
conns.push(conn); | ||
} | ||
} | ||
} | ||
|
||
impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector for Pool<C> { | ||
type Stream = PooledStream<S>; | ||
fn connect(&mut self, host: &str, port: u16, scheme: &str) -> io::Result<PooledStream<S>> { | ||
let key = key(host, port, scheme); | ||
let mut locked = self.inner.lock().unwrap(); | ||
let mut should_remove = false; | ||
let conn = match locked.conns.get_mut(&key) { | ||
Some(ref mut vec) => { | ||
should_remove = vec.len() == 1; | ||
vec.pop().unwrap() | ||
} | ||
_ => try!(self.connector.connect(host, port, scheme)) | ||
}; | ||
if should_remove { | ||
locked.conns.remove(&key); | ||
} | ||
Ok(PooledStream { | ||
inner: Some((key, conn)), | ||
is_closed: false, | ||
is_drained: false, | ||
pool: self.inner.clone() | ||
}) | ||
} | ||
} | ||
|
||
/// A Stream that will try to be returned to the Pool when dropped. | ||
pub struct PooledStream<S> { | ||
inner: Option<(Key, S)>, | ||
is_closed: bool, | ||
is_drained: bool, | ||
pool: Arc<Mutex<PoolImpl<S>>> | ||
} | ||
|
||
impl<S: NetworkStream> Read for PooledStream<S> { | ||
#[inline] | ||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||
match self.inner.as_mut().unwrap().1.read(buf) { | ||
Ok(0) => { | ||
self.is_drained = true; | ||
Ok(0) | ||
} | ||
r => r | ||
} | ||
} | ||
} | ||
|
||
impl<S: NetworkStream> Write for PooledStream<S> { | ||
#[inline] | ||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { | ||
self.inner.as_mut().unwrap().1.write(buf) | ||
} | ||
|
||
#[inline] | ||
fn flush(&mut self) -> io::Result<()> { | ||
self.inner.as_mut().unwrap().1.flush() | ||
} | ||
} | ||
|
||
impl<S: NetworkStream> NetworkStream for PooledStream<S> { | ||
#[inline] | ||
fn peer_addr(&mut self) -> io::Result<SocketAddr> { | ||
self.inner.as_mut().unwrap().1.peer_addr() | ||
} | ||
|
||
#[inline] | ||
fn close(&mut self, how: Shutdown) -> io::Result<()> { | ||
self.is_closed = true; | ||
self.inner.as_mut().unwrap().1.close(how) | ||
} | ||
} | ||
|
||
impl<S> Drop for PooledStream<S> { | ||
fn drop(&mut self) { | ||
trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained); | ||
if !self.is_closed && self.is_drained { | ||
self.inner.take().map(|(key, conn)| { | ||
if let Ok(mut pool) = self.pool.lock() { | ||
pool.reuse(key, conn); | ||
} | ||
// else poisoned, give up | ||
}); | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::net::Shutdown; | ||
use mock::MockConnector; | ||
use net::{NetworkConnector, NetworkStream}; | ||
|
||
use super::{Pool, key}; | ||
|
||
macro_rules! mocked { | ||
() => ({ | ||
Pool::with_connector(Default::default(), MockConnector) | ||
}) | ||
} | ||
|
||
#[test] | ||
fn test_connect_and_drop() { | ||
let mut pool = mocked!(); | ||
let key = key("127.0.0.1", 3000, "http"); | ||
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; | ||
{ | ||
let locked = pool.inner.lock().unwrap(); | ||
assert_eq!(locked.conns.len(), 1); | ||
assert_eq!(locked.conns.get(&key).unwrap().len(), 1); | ||
} | ||
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused | ||
{ | ||
let locked = pool.inner.lock().unwrap(); | ||
assert_eq!(locked.conns.len(), 1); | ||
assert_eq!(locked.conns.get(&key).unwrap().len(), 1); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_closed() { | ||
let mut pool = mocked!(); | ||
let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap(); | ||
stream.close(Shutdown::Both).unwrap(); | ||
drop(stream); | ||
let locked = pool.inner.lock().unwrap(); | ||
assert_eq!(locked.conns.len(), 0); | ||
} | ||
|
||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.