diff --git a/.travis.yml b/.travis.yml index 30e0e3c4fc..5f414dd385 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,15 +2,10 @@ language: rust matrix: fast_finish: true include: - - os: osx - rust: stable - env: FEATURES="--no-default-features --features security-framework" - rust: nightly env: FEATURES="--features nightly" - rust: beta - rust: stable - - rust: stable - env: FEATURES="--no-default-features" cache: apt: true diff --git a/Cargo.toml b/Cargo.toml index 1374c04dc3..471db5ac2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,49 +7,34 @@ readme = "README.md" documentation = "http://hyperium.github.io/hyper" repository = "https://github.com/hyperium/hyper" license = "MIT" -authors = ["Sean McArthur ", - "Jonathan Reem "] +authors = ["Sean McArthur "] keywords = ["http", "hyper", "hyperium"] [dependencies] +futures = "0.1.7" +futures-cpupool = "0.1" httparse = "1.0" language-tags = "0.2" log = "0.3" mime = "0.2" -rotor = "0.6" +relay = "0.1" rustc-serialize = "0.3" -spmc = "0.2" time = "0.1" +tokio-core = "0.1" +tokio-proto = "0.1" +tokio-service = "0.1" unicase = "1.0" url = "1.0" -vecio = "0.1" [dependencies.cookie] version = "0.3" default-features = false -[dependencies.openssl] -version = "0.7" -optional = true - -[dependencies.openssl-verify] -version = "0.1" -optional = true - -[dependencies.security-framework] -version = "0.1.4" -optional = true - -[dependencies.serde] -version = "0.8" -optional = true - [dev-dependencies] -env_logger = "0.3" num_cpus = "1.0" +pretty_env_logger = "0.1" +spmc = "0.2" [features] -default = ["ssl"] -ssl = ["openssl", "openssl-verify"] -serde-serialization = ["serde", "mime/serde"] +default = [] nightly = [] diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs new file mode 100644 index 0000000000..50d0dc11fd --- /dev/null +++ b/benches/end_to_end.rs @@ -0,0 +1,61 @@ +#![feature(test)] + +extern crate futures; +extern crate hyper; +extern crate tokio_core; + +extern crate test; + +use futures::{Future, Stream}; +use tokio_core::reactor::Core; + +use hyper::header::{ContentLength, ContentType}; +use hyper::server::{Service, Request, Response}; + + +#[bench] +fn one_request_at_a_time(b: &mut test::Bencher) { + extern crate pretty_env_logger; + let _ = pretty_env_logger::init(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap() + .handle(|| Ok(Hello), &handle).unwrap(); + + let mut client = hyper::Client::new(&handle); + + let url: hyper::Url = format!("http://{}/get", addr).parse().unwrap(); + + b.bytes = 160; + b.iter(move || { + let work = client.get(url.clone()).and_then(|res| { + res.body().for_each(|_chunk| { + Ok(()) + }) + }); + + core.run(work).unwrap(); + }); +} + +static PHRASE: &'static [u8] = b"Hello, World!"; + +#[derive(Clone, Copy)] +struct Hello; + +impl Service for Hello { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = ::futures::Finished; + fn call(&mut self, _req: Request) -> Self::Future { + ::futures::finished( + Response::new() + .with_header(ContentLength(PHRASE.len() as u64)) + .with_header(ContentType::plaintext()) + .with_body(PHRASE) + ) + } + +} diff --git a/doc/guide/server.md b/doc/guide/server.md index 5d2fdf4415..12b9de242e 100644 --- a/doc/guide/server.md +++ b/doc/guide/server.md @@ -1,387 +1,3 @@ % Server Guide -# Hello, World -Let's start off by creating a simple server to just serve a text response -of "Hello, World!" to every request. - -```no_run -extern crate hyper; -use hyper::{Decoder, Encoder, HttpStream as Http, Next}; -use hyper::server::{Server, Handler, Request, Response}; - -struct Text(&'static [u8]); - -impl Handler for Text { - fn on_request(&mut self, _req: Request) -> Next { - Next::write() - } - - fn on_request_readable(&mut self, _decoder: &mut Decoder) -> Next { - Next::write() - } - - fn on_response(&mut self, res: &mut Response) -> Next { - use hyper::header::ContentLength; - res.headers_mut().set(ContentLength(self.0.len() as u64)); - Next::write() - } - - fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { - encoder.write(self.0).unwrap(); // for now - Next::end() - } -} - -fn main() { - let addr = "127.0.0.1:0".parse().unwrap(); - let (listening, server) = Server::http(&addr).unwrap() - .handle(|_| Text(b"Hello, World")).unwrap(); - - println!("Listening on http://{}", listening); - server.run() -} -``` - -There is quite a few concepts here, so let's tackle them one by one. - -## Handler - -The [`Handler`][Handler] is how you define what should happen during the lifetime -of an HTTP message. We've implemented it for the `Text`, defining what should -happen at each event during an HTTP message. - -## Next - -Every event in the [`Handler`][Handler] returns a [`Next`][Next]. This signals -to hyper what the `Handler` would wishes to do next, and hyper will call the -appropriate method of the `Handler` when the action is ready again. - -So, in our "Hello World" server, you'll notice that when a request comes in, we -have no interest in the `Request` or its body. We immediately just wish to write -"Hello, World!", and be done. So, in `on_request`, we return `Next::write()`, -which tells hyper we wish to write the response. - -After `on_response` is called, we ask for `Next::write()` again, because we -still need to write the response body. hyper knows that the next time the -transport is ready to be written, since it already called `on_response`, it -will call `on_response_writable`, which is where we can write the text body. - -Once we're all done with the response, we can tell hyper to finish by returning -`Next::end()`. hyper will try to finish flushing all the output, and if the -conditions are met, it may try to use the underlying transport for another -request. This is also known as "keep-alive". - -## Server - -In the `main` function, a [`Server`][Server] is created that will utilize our -`Hello` handler. We use the default options, though you may wish to peruse -them, especially the `max_sockets` option, as it is conservative by default. - -We pass a constructor closure to `Server.handle`, which constructs a `Handler` -to be used for each incoming request. - -# Non-blocking IO - -## Don't Panic - -There is actually a very bad practice in the "Hello, World" example. The usage -of `decoder.write(x).unwrap()` will panic if the write operation fails. A panic -will take down the whole thread, which means the event loop and all other -in-progress requests. So don't do it. It's bad. - -What makes it worse, is that the write operation is much more likely to fail -when using non-blocking IO. If the write would block the thread, instead of -doing so, it will return an `io::Error` with the `kind` of `WouldBlock`. These -are expected errors. - -## WouldBlock - -Instead, we should inspect when there is a read or write error to see if the -`kind` was a `WouldBlock` error. Since `WouldBlock` is so common when using -non-blocking IO, the `Encoder` and `Decoder` provide `try_` methods that will -special case `WouldBlock`, allowing you to treat all `Err` cases as actual -errors. - -Additionally, it's possible there was a partial write of the response body, so -we should probably change the example to keep track of it's progress. Can you -see how we should change the example to better handle these conditions? - -This will just show the updated `on_response_writable` method, the rest stays -the same: - -```no_run -# extern crate hyper; -# use hyper::{Encoder, HttpStream as Http, Next}; - -# struct Text(&'static [u8]); - -# impl Text { - fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { - match encoder.try_write(self.0) { - Ok(Some(n)) => { - if n == self.0.len() { - // all done! - Next::end() - } else { - // a partial write! - // with a static array, we can just move our pointer - // another option could be to store a separate index field - self.0 = &self.0[n..]; - // there's still more to write, so ask to write again - Next::write() - } - }, - Ok(None) => { - // would block, ask to write again - Next::write() - }, - Err(e) => { - println!("oh noes, we cannot say hello! {}", e); - // remove (kill) this transport - Next::remove() - } - } - } -# } - -# fn main() {} -``` - -# Routing - -What if we wanted to serve different messages depending on the URL of the -request? Say, we wanted to respond with "Hello, World!" to `/hello`, but -"Good-bye" with `/bye`. Let's adjust our example to do that. - -```no_run -extern crate hyper; -use hyper::{Decoder, Encoder, HttpStream as Http, Next, StatusCode}; -use hyper::server::{Server, Handler, Request, Response}; - -struct Text(StatusCode, &'static [u8]); - -impl Handler for Text { - fn on_request(&mut self, req: Request) -> Next { - use hyper::RequestUri; - let path = match *req.uri() { - RequestUri::AbsolutePath { path: ref p, .. } => p, - RequestUri::AbsoluteUri(ref url) => url.path(), - // other 2 forms are for CONNECT and OPTIONS methods - _ => "" - }; - - match path { - "/hello" => { - self.1 = b"Hello, World!"; - }, - "/bye" => { - self.1 = b"Good-bye"; - }, - _ => { - self.0 = StatusCode::NotFound; - self.1 = b"Not Found"; - } - } - Next::write() - } - -# fn on_request_readable(&mut self, _decoder: &mut Decoder) -> Next { -# Next::write() -# } - - fn on_response(&mut self, res: &mut Response) -> Next { - use hyper::header::ContentLength; - // we send an HTTP Status Code, 200 OK, or 404 Not Found - res.set_status(self.0); - res.headers_mut().set(ContentLength(self.1.len() as u64)); - Next::write() - } - -# fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { -# match encoder.try_write(self.1) { -# Ok(Some(n)) => { -# if n == self.1.len() { -# Next::end() -# } else { -# self.1 = &self.1[n..]; -# Next::write() -# } -# }, -# Ok(None) => { -# Next::write() -# }, -# Err(e) => { -# println!("oh noes, we cannot say hello! {}", e); -# Next::remove() -# } -# } -# } -} - -fn main() { - let addr = "127.0.0.1:0".parse().unwrap(); - let (listening, server) = Server::http(&addr).unwrap() - .handle(|_| Text(StatusCode::Ok, b"")).unwrap(); - - println!("Listening on http://{}", listening); - server.run() -} -``` - -# Waiting - -More often than not, a server needs to something "expensive" before it can -provide a response to a request. This may be talking to a database, reading -a file, processing an image, sending its own HTTP request to another server, -or anything else that would impede the event loop thread. These sorts of actions -should be done off the event loop thread, when complete, should notify hyper -that it can now proceed. This is done by combining `Next::wait()` and the -[`Control`][Control]. - -## Control - -The `Control` is provided to the `Handler` constructor; it is the argument we -have so far been ignoring. It's not needed if we don't ever need to wait a -transport. The `Control` is usually sent to a queue, or another thread, or -wherever makes sense to be able to use it when the "blocking" operations are -complete. - -To focus on hyper instead of obscure blocking operations, we'll use this useless -sleeping thread to show it works. - -```no_run -extern crate hyper; - -use std::sync::mpsc; -use std::thread; -use std::time::Duration; - -use hyper::{Control, Next}; - -fn calculate_ultimate_question(rx: mpsc::Receiver<(Control, mpsc::Sender<&'static [u8]>)>) { - thread::spawn(move || { - while let Ok((ctrl, tx)) = rx.recv() { - thread::sleep(Duration::from_millis(500)); - tx.send(b"42").unwrap(); - ctrl.ready(Next::write()).unwrap(); - } - }); -} - -# fn main() {} -``` - -Our worker will spawn a thread that waits on messages. When receiving a message, -after a short nap, it will send back the "result" of the work, and wake up the -waiting transport with a `Next::write()` desire. - -## Wait - -Finally, let's tie in our worker thread into our `Text` handler: - -```no_run -extern crate hyper; -use hyper::{Control, Decoder, Encoder, HttpStream as Http, Next, StatusCode}; -use hyper::server::{Server, Handler, Request, Response}; - -use std::sync::mpsc; - -struct Text { - status: StatusCode, - text: &'static [u8], - control: Option, - worker_tx: mpsc::Sender<(Control, mpsc::Sender<&'static [u8]>)>, - worker_rx: Option>, -} - -impl Handler for Text { - fn on_request(&mut self, req: Request) -> Next { - use hyper::RequestUri; - let path = match *req.uri() { - RequestUri::AbsolutePath { path: ref p, .. } => p, - RequestUri::AbsoluteUri(ref url) => url.path(), - _ => "" - }; - - match path { - "/hello" => { - self.text = b"Hello, World!"; - }, - "/bye" => { - self.text = b"Good-bye"; - }, - "/question" => { - let (tx, rx) = mpsc::channel(); - // queue work on our worker - self.worker_tx.send((self.control.take().unwrap(), tx)).unwrap(); - // save receive channel for response handling - self.worker_rx = Some(rx); - // tell hyper we need to wait until we can continue - return Next::wait(); - } - _ => { - self.status = StatusCode::NotFound; - self.text = b"Not Found"; - } - } - Next::write() - } - -# fn on_request_readable(&mut self, _decoder: &mut Decoder) -> Next { -# Next::write() -# } -# - - fn on_response(&mut self, res: &mut Response) -> Next { - use hyper::header::ContentLength; - res.set_status(self.status); - if let Some(rx) = self.worker_rx.take() { - self.text = rx.recv().unwrap(); - } - res.headers_mut().set(ContentLength(self.text.len() as u64)); - Next::write() - } -# -# fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { -# unimplemented!() -# } -} - -# fn calculate_ultimate_question(rx: mpsc::Receiver<(Control, mpsc::Sender<&'static [u8]>)>) { -# use std::sync::mpsc; -# use std::thread; -# use std::time::Duration; -# thread::spawn(move || { -# while let Ok((ctrl, tx)) = rx.recv() { -# thread::sleep(Duration::from_millis(500)); -# tx.send(b"42").unwrap(); -# ctrl.ready(Next::write()).unwrap(); -# } -# }); -# } - -fn main() { - let (tx, rx) = mpsc::channel(); - calculate_ultimate_question(rx); - let addr = "127.0.0.1:0".parse().unwrap(); - let (listening, server) = Server::http(&addr).unwrap() - .handle(move |ctrl| Text { - status: StatusCode::Ok, - text: b"", - control: Some(ctrl), - worker_tx: tx.clone(), - worker_rx: None, - }).unwrap(); - - println!("Listening on http://{}", listening); - server.run() -} -``` - - - -[Control]: ../hyper/struct.Control.html -[Handler]: ../hyper/server/trait.Handler.html -[Next]: ../hyper/struct.Next.html -[Server]: ../hyper/server/struct.Server.html diff --git a/examples/client.rs b/examples/client.rs index e453656cce..698cbbfde1 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,68 +1,20 @@ #![deny(warnings)] +extern crate futures; extern crate hyper; +extern crate tokio_core; -extern crate env_logger; +extern crate pretty_env_logger; use std::env; -use std::io; -use std::sync::mpsc; -use std::time::Duration; +use std::io::{self, Write}; -use hyper::client::{Client, Request, Response, DefaultTransport as HttpStream}; -use hyper::header::Connection; -use hyper::{Decoder, Encoder, Next}; +use futures::Future; +use futures::stream::Stream; -#[derive(Debug)] -struct Dump(mpsc::Sender<()>); - -impl Drop for Dump { - fn drop(&mut self) { - let _ = self.0.send(()); - } -} - -fn read() -> Next { - Next::read().timeout(Duration::from_secs(10)) -} - -impl hyper::client::Handler for Dump { - fn on_request(&mut self, req: &mut Request) -> Next { - req.headers_mut().set(Connection::close()); - read() - } - - fn on_request_writable(&mut self, _encoder: &mut Encoder) -> Next { - read() - } - - fn on_response(&mut self, res: Response) -> Next { - println!("Response: {}", res.status()); - println!("Headers:\n{}", res.headers()); - read() - } - - fn on_response_readable(&mut self, decoder: &mut Decoder) -> Next { - match io::copy(decoder, &mut io::stdout()) { - Ok(0) => Next::end(), - Ok(_) => read(), - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => Next::read(), - _ => { - println!("ERROR:example: {}", e); - Next::end() - } - } - } - } - - fn on_error(&mut self, err: hyper::Error) -> Next { - println!("ERROR:example: {}", err); - Next::remove() - } -} +use hyper::Client; fn main() { - env_logger::init().unwrap(); + pretty_env_logger::init().unwrap(); let url = match env::args().nth(1) { Some(url) => url, @@ -72,11 +24,26 @@ fn main() { } }; - let (tx, rx) = mpsc::channel(); - let client = Client::new().expect("Failed to create a Client"); - client.request(url.parse().unwrap(), Dump(tx)).unwrap(); + let url = hyper::Url::parse(&url).unwrap(); + if url.scheme() != "http" { + println!("This example only works with 'http' URLs."); + return; + } + + let mut core = tokio_core::reactor::Core::new().unwrap(); + let handle = core.handle(); + let client = Client::new(&handle); + + let work = client.get(url).and_then(|res| { + println!("Response: {}", res.status()); + println!("Headers: \n{}", res.headers()); + + res.body().for_each(|chunk| { + io::stdout().write_all(&chunk).map_err(From::from) + }) + }).map(|_| { + println!("\n\nDone."); + }); - // wait till done - let _ = rx.recv(); - client.close(); + core.run(work).unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index 221a990895..a9e45d3cd3 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -1,50 +1,39 @@ #![deny(warnings)] extern crate hyper; -extern crate env_logger; -extern crate num_cpus; +extern crate futures; +extern crate pretty_env_logger; +//extern crate num_cpus; -use hyper::{Decoder, Encoder, Next, HttpStream}; -use hyper::server::{Server, Handler, Request, Response, HttpListener}; +use hyper::header::{ContentLength, ContentType}; +use hyper::server::{Server, Service, Request, Response}; static PHRASE: &'static [u8] = b"Hello World!"; +#[derive(Clone, Copy)] struct Hello; -impl Handler for Hello { - fn on_request(&mut self, _: Request) -> Next { - Next::write() - } - fn on_request_readable(&mut self, _: &mut Decoder) -> Next { - Next::write() - } - fn on_response(&mut self, response: &mut Response) -> Next { - use hyper::header::ContentLength; - response.headers_mut().set(ContentLength(PHRASE.len() as u64)); - Next::write() - } - fn on_response_writable(&mut self, encoder: &mut Encoder) -> Next { - let n = encoder.write(PHRASE).unwrap(); - debug_assert_eq!(n, PHRASE.len()); - Next::end() +impl Service for Hello { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = ::futures::Finished; + fn call(&self, _req: Request) -> Self::Future { + ::futures::finished( + Response::new() + .with_header(ContentLength(PHRASE.len() as u64)) + .with_header(ContentType::plaintext()) + .with_body(PHRASE) + ) } + } fn main() { - env_logger::init().unwrap(); - - let listener = HttpListener::bind(&"127.0.0.1:3000".parse().unwrap()).unwrap(); - let mut handles = Vec::new(); - - for _ in 0..num_cpus::get() { - let listener = listener.try_clone().unwrap(); - handles.push(::std::thread::spawn(move || { - Server::new(listener) - .handle(|_| Hello).unwrap(); - })); - } - println!("Listening on http://127.0.0.1:3000"); - - for handle in handles { - handle.join().unwrap(); - } + pretty_env_logger::init().unwrap(); + let addr = "127.0.0.1:3000".parse().unwrap(); + let _server = Server::standalone(|tokio| { + Server::http(&addr, tokio)? + .handle(|| Ok(Hello), tokio) + }).unwrap(); + println!("Listening on http://{}", addr); } diff --git a/examples/server.rs b/examples/server.rs index a2dbe23c2b..f0578c3328 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,164 +1,57 @@ -#![deny(warnings)] +//#![deny(warnings)] +extern crate futures; extern crate hyper; -extern crate env_logger; +extern crate pretty_env_logger; #[macro_use] extern crate log; -use hyper::{Get, Post, StatusCode, RequestUri, Decoder, Encoder, HttpStream, Next}; +use hyper::{Get, Post, StatusCode}; use hyper::header::ContentLength; -use hyper::server::{Server, Handler, Request, Response}; +use hyper::server::{Server, Service, Request, Response}; -struct Echo { - buf: Vec, - read_pos: usize, - write_pos: usize, - eof: bool, - route: Route, -} - -enum Route { - NotFound, - Index, - Echo(Body), -} - -#[derive(Clone, Copy)] -enum Body { - Len(u64), - Chunked -} static INDEX: &'static [u8] = b"Try POST /echo"; -impl Echo { - fn new() -> Echo { - Echo { - buf: vec![0; 4096], - read_pos: 0, - write_pos: 0, - eof: false, - route: Route::NotFound, - } - } -} - -impl Handler for Echo { - fn on_request(&mut self, req: Request) -> Next { - match *req.uri() { - RequestUri::AbsolutePath { ref path, .. } => match (req.method(), &path[..]) { - (&Get, "/") | (&Get, "/echo") => { - info!("GET Index"); - self.route = Route::Index; - Next::write() - } - (&Post, "/echo") => { - info!("POST Echo"); - let mut is_more = true; - self.route = if let Some(len) = req.headers().get::() { - is_more = **len > 0; - Route::Echo(Body::Len(**len)) - } else { - Route::Echo(Body::Chunked) - }; - if is_more { - Next::read_and_write() - } else { - Next::write() - } - } - _ => Next::write(), +#[derive(Clone, Copy)] +struct Echo; + +impl Service for Echo { + type Request = Request; + type Response = Response; + type Error = hyper::Error; + type Future = ::futures::Finished; + + fn call(&self, req: Request) -> Self::Future { + ::futures::finished(match (req.method(), req.path()) { + (&Get, Some("/")) | (&Get, Some("/echo")) => { + Response::new() + .with_header(ContentLength(INDEX.len() as u64)) + .with_body(INDEX) }, - _ => Next::write() - } - } - fn on_request_readable(&mut self, transport: &mut Decoder) -> Next { - match self.route { - Route::Echo(ref body) => { - if self.read_pos < self.buf.len() { - match transport.try_read(&mut self.buf[self.read_pos..]) { - Ok(Some(0)) => { - debug!("Read 0, eof"); - self.eof = true; - Next::write() - }, - Ok(Some(n)) => { - self.read_pos += n; - match *body { - Body::Len(max) if max <= self.read_pos as u64 => { - self.eof = true; - Next::write() - }, - _ => Next::read_and_write() - } - } - Ok(None) => Next::read_and_write(), - Err(e) => { - println!("read error {:?}", e); - Next::end() - } - } - } else { - Next::write() + (&Post, Some("/echo")) => { + let mut res = Response::new(); + if let Some(len) = req.headers().get::() { + res.headers_mut().set(len.clone()); } + res.with_body(req.body()) + }, + _ => { + Response::new() + .with_status(StatusCode::NotFound) } - _ => unreachable!() - } - } - - fn on_response(&mut self, res: &mut Response) -> Next { - match self.route { - Route::NotFound => { - res.set_status(StatusCode::NotFound); - Next::end() - } - Route::Index => { - res.headers_mut().set(ContentLength(INDEX.len() as u64)); - Next::write() - } - Route::Echo(body) => { - if let Body::Len(len) = body { - res.headers_mut().set(ContentLength(len)); - } - Next::read_and_write() - } - } + }) } - fn on_response_writable(&mut self, transport: &mut Encoder) -> Next { - match self.route { - Route::Index => { - transport.write(INDEX).unwrap(); - Next::end() - } - Route::Echo(..) => { - if self.write_pos < self.read_pos { - match transport.try_write(&self.buf[self.write_pos..self.read_pos]) { - Ok(Some(0)) => panic!("write ZERO"), - Ok(Some(n)) => { - self.write_pos += n; - Next::write() - } - Ok(None) => Next::write(), - Err(e) => { - println!("write error {:?}", e); - Next::end() - } - } - } else if !self.eof { - Next::read() - } else { - Next::end() - } - } - _ => unreachable!() - } - } } + fn main() { - env_logger::init().unwrap(); - let server = Server::http(&"127.0.0.1:1337".parse().unwrap()).unwrap(); - let (listening, server) = server.handle(|_| Echo::new()).unwrap(); + pretty_env_logger::init().unwrap(); + let addr = "127.0.0.1:1337".parse().unwrap(); + let (listening, server) = Server::standalone(|tokio| { + Server::http(&addr, tokio)? + .handle(|| Ok(Echo), tokio) + }).unwrap(); println!("Listening on http://{}", listening); server.run(); } diff --git a/src/client/connect.rs b/src/client/connect.rs index 1ad3136572..babb24bb4f 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -1,59 +1,59 @@ -use std::collections::hash_map::{HashMap, Entry}; -use std::hash::Hash; use std::fmt; use std::io; -use std::net::SocketAddr; +//use std::net::SocketAddr; -use rotor::mio::tcp::TcpStream; +use futures::{Future, Poll, Async}; +use tokio::io::Io; +use tokio::reactor::Handle; +use tokio::net::{TcpStream, TcpStreamNew}; +use tokio_service::Service; use url::Url; -use net::{HttpStream, HttpsStream, Transport, SslClient}; -use super::dns::Dns; -use super::Registration; - -/// A connector creates a Transport to a remote address.. -pub trait Connect { - /// Type of Transport to create - type Output: Transport; - /// The key used to determine if an existing socket can be used. - type Key: Eq + Hash + Clone + fmt::Debug; - /// Returns the key based off the Url. - fn key(&self, &Url) -> Option; +use super::dns; + +/// A connector creates an Io to a remote address.. +/// +/// This trait is not implemented directly, and only exists to make +/// the intent clearer. A connector should implement `Service` with +/// `Request=Url` and `Response: Io` instead. +pub trait Connect: Service + 'static { + /// The connected Io Stream. + type Output: Io + 'static; + /// A Future that will resolve to the connected Stream. + type Future: Future + 'static; /// Connect to a remote address. - fn connect(&mut self, &Url) -> io::Result; - /// Returns a connected socket and associated host. - fn connected(&mut self) -> Option<(Self::Key, io::Result)>; - #[doc(hidden)] - /// Configure number of dns workers to use. - fn dns_workers(&mut self, usize); - #[doc(hidden)] - fn register(&mut self, Registration); + fn connect(&self, Url) -> ::Future; +} + +impl Connect for T +where T: Service + 'static, + T::Response: Io, + T::Future: Future, +{ + type Output = T::Response; + type Future = T::Future; + + fn connect(&self, url: Url) -> ::Future { + self.call(url) + } } /// A connector for the `http` scheme. +#[derive(Clone)] pub struct HttpConnector { - dns: Option, - threads: usize, - resolving: HashMap>, + dns: dns::Dns, + handle: Handle, } impl HttpConnector { - /// Set the number of resolver threads. - /// - /// Default is 4. - pub fn threads(mut self, threads: usize) -> HttpConnector { - debug_assert!(self.dns.is_none(), "setting threads after Dns is created does nothing"); - self.threads = threads; - self - } -} -impl Default for HttpConnector { - fn default() -> HttpConnector { + /// Construct a new HttpConnector. + /// + /// Takes number of DNS worker threads. + pub fn new(threads: usize, handle: &Handle) -> HttpConnector { HttpConnector { - dns: None, - threads: 4, - resolving: HashMap::new(), + dns: dns::Dns::new(threads), + handle: handle.clone(), } } } @@ -61,79 +61,115 @@ impl Default for HttpConnector { impl fmt::Debug for HttpConnector { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("HttpConnector") - .field("threads", &self.threads) - .field("resolving", &self.resolving) .finish() } } -impl Connect for HttpConnector { - type Output = HttpStream; - type Key = (&'static str, String, u16); +impl Service for HttpConnector { + type Request = Url; + type Response = TcpStream; + type Error = io::Error; + type Future = HttpConnecting; - fn dns_workers(&mut self, count: usize) { - self.threads = count; - } + fn call(&self, url: Url) -> Self::Future { + debug!("Http::connect({:?})", url); + let host = match url.host_str() { + Some(s) => s, + None => return HttpConnecting { + state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, "invalid url"))), + handle: self.handle.clone(), + }, + }; + let port = url.port_or_known_default().unwrap_or(80); - fn key(&self, url: &Url) -> Option { - if url.scheme() == "http" { - Some(( - "http", - url.host_str().expect("http scheme must have host").to_owned(), - url.port().unwrap_or(80), - )) - } else { - None + HttpConnecting { + state: State::Resolving(self.dns.resolve(host.into(), port)), + handle: self.handle.clone(), } } - fn connect(&mut self, url: &Url) -> io::Result { - debug!("Http::connect({:?})", url); - if let Some(key) = self.key(url) { - let host = url.host_str().expect("http scheme must have a host"); - self.dns.as_ref().expect("dns workers lost").resolve(host); - self.resolving.entry(host.to_owned()).or_insert_with(Vec::new).push(key.clone()); - Ok(key) - } else { - Err(io::Error::new(io::ErrorKind::InvalidInput, "scheme must be http")) - } - } +} - fn connected(&mut self) -> Option<(Self::Key, io::Result)> { - let (host, addrs) = match self.dns.as_ref().expect("dns workers lost").resolved() { - Ok(res) => res, - Err(_) => return None - }; - //TODO: try all addrs - let addr = addrs.and_then(|mut addrs| Ok(addrs.next().unwrap())); - debug!("Http::resolved <- ({:?}, {:?})", host, addr); - if let Entry::Occupied(mut entry) = self.resolving.entry(host) { - let resolved = entry.get_mut().remove(0); - if entry.get().is_empty() { - entry.remove(); +/// A Future representing work to connect to a URL. +pub struct HttpConnecting { + state: State, + handle: Handle, +} + +enum State { + Resolving(dns::Query), + Connecting(ConnectingTcp), + Error(Option), +} + +impl Future for HttpConnecting { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let state; + match self.state { + State::Resolving(ref mut query) => { + match try!(query.poll()) { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(addrs) => { + state = State::Connecting(ConnectingTcp { + addrs: addrs, + current: None, + }) + } + }; + }, + State::Connecting(ref mut c) => return c.poll(&self.handle).map_err(From::from), + State::Error(ref mut e) => return Err(e.take().expect("polled more than once")), } - let port = resolved.2; - Some((resolved, addr.and_then(|addr| TcpStream::connect(&SocketAddr::new(addr, port)) - .map(HttpStream)) - )) - } else { - trace!("^-- resolved but not in hashmap?"); - None + self.state = state; } } +} - fn register(&mut self, reg: Registration) { - self.dns = Some(Dns::new(reg.notify, self.threads)); +impl fmt::Debug for HttpConnecting { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnecting") } } -/// A connector that can protect HTTP streams using SSL. -#[derive(Debug, Default)] -pub struct HttpsConnector { - http: HttpConnector, - ssl: S +struct ConnectingTcp { + addrs: dns::IpAddrs, + current: Option, +} + +impl ConnectingTcp { + // not a Future, since passing a &Handle to poll + fn poll(&mut self, handle: &Handle) -> Poll { + let mut err = None; + loop { + if let Some(ref mut current) = self.current { + match current.poll() { + Ok(ok) => return Ok(ok), + Err(e) => { + trace!("connect error {:?}", e); + err = Some(e); + if let Some(addr) = self.addrs.next() { + debug!("connecting to {:?}", addr); + *current = TcpStream::connect(&addr, handle); + continue; + } + } + } + } else if let Some(addr) = self.addrs.next() { + debug!("connecting to {:?}", addr); + self.current = Some(TcpStream::connect(&addr, handle)); + continue; + } + + return Err(err.take().expect("missing connect error")); + } + } } +/* impl HttpsConnector { /// Create a new connector using the provided SSL implementation. pub fn new(s: S) -> HttpsConnector { @@ -143,80 +179,22 @@ impl HttpsConnector { } } } - -impl Connect for HttpsConnector { - type Output = HttpsStream; - type Key = (&'static str, String, u16); - - fn dns_workers(&mut self, count: usize) { - self.http.dns_workers(count) - } - - fn key(&self, url: &Url) -> Option { - let scheme = match url.scheme() { - "http" => "http", - "https" => "https", - _ => return None - }; - Some(( - scheme, - url.host_str().expect("http scheme must have host").to_owned(), - url.port_or_known_default().expect("http scheme must have a port"), - )) - } - - fn connect(&mut self, url: &Url) -> io::Result { - debug!("Https::connect({:?})", url); - if let Some(key) = self.key(url) { - let host = url.host_str().expect("http scheme must have a host"); - self.http.dns.as_ref().expect("dns workers lost").resolve(host); - self.http.resolving.entry(host.to_owned()).or_insert_with(Vec::new).push(key.clone()); - Ok(key) - } else { - Err(io::Error::new(io::ErrorKind::InvalidInput, "scheme must be http or https")) - } - } - - fn connected(&mut self) -> Option<(Self::Key, io::Result)> { - self.http.connected().map(|(key, res)| { - let res = res.and_then(|http| { - if key.0 == "https" { - self.ssl.wrap_client(http, &key.1) - .map(HttpsStream::Https) - .map_err(|e| match e { - ::Error::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e) - }) - } else { - Ok(HttpsStream::Http(http)) - } - }); - (key, res) - }) - } - - fn register(&mut self, reg: Registration) { - self.http.register(reg); +*/ + +#[cfg(test)] +mod tests { + use std::io; + use tokio::reactor::Core; + use url::Url; + use super::{Connect, HttpConnector}; + + #[test] + fn test_non_http_url() { + let mut core = Core::new().unwrap(); + let url = Url::parse("file:///home/sean/foo.txt").unwrap(); + let connector = HttpConnector::new(1, &core.handle()); + + assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput); } -} - -#[cfg(not(any(feature = "openssl", feature = "security-framework")))] -#[doc(hidden)] -pub type DefaultConnector = HttpConnector; - -#[cfg(all(feature = "openssl", not(feature = "security-framework")))] -#[doc(hidden)] -pub type DefaultConnector = HttpsConnector<::net::Openssl>; - -#[cfg(feature = "security-framework")] -#[doc(hidden)] -pub type DefaultConnector = HttpsConnector<::net::SecureTransportClient>; - -#[doc(hidden)] -pub type DefaultTransport = ::Output; - -fn _assert_defaults() { - fn _assert() where T: Connect, U: Transport {} - _assert::(); } diff --git a/src/client/dns.rs b/src/client/dns.rs index c0dcd359c7..5badd5c5f7 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -1,96 +1,53 @@ use std::io; -use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; -use std::thread; +use std::net::{SocketAddr, ToSocketAddrs}; use std::vec; -use ::spmc; - -use http::channel; +use ::futures::{Future, Poll}; +use ::futures_cpupool::{CpuPool, CpuFuture}; +#[derive(Clone)] pub struct Dns { - tx: spmc::Sender, - rx: channel::Receiver, -} - -pub type Answer = (String, io::Result); - -pub struct IpAddrs { - iter: vec::IntoIter, -} - -impl Iterator for IpAddrs { - type Item = IpAddr; - #[inline] - fn next(&mut self) -> Option { - self.iter.next().map(|addr| addr.ip()) - } + pool: CpuPool, } impl Dns { - pub fn new(notify: (channel::Sender, channel::Receiver), threads: usize) -> Dns { - let (tx, rx) = spmc::channel(); - for _ in 0..threads { - work(rx.clone(), notify.0.clone()); - } + pub fn new(threads: usize) -> Dns { Dns { - tx: tx, - rx: notify.1, + pool: CpuPool::new(threads) } } - pub fn resolve>(&self, hostname: T) { - self.tx.send(hostname.into()).expect("DNS workers all died unexpectedly"); - } - - pub fn resolved(&self) -> Result { - self.rx.try_recv() + pub fn resolve(&self, host: String, port: u16) -> Query { + Query(self.pool.spawn_fn(move || work(host, port))) } } -fn work(rx: spmc::Receiver, notify: channel::Sender) { - thread::Builder::new().name(String::from("hyper-dns")).spawn(move || { - let mut worker = Worker::new(rx, notify); - let rx = worker.rx.as_ref().expect("Worker lost rx"); - let notify = worker.notify.as_ref().expect("Worker lost notify"); - while let Ok(host) = rx.recv() { - debug!("resolve {:?}", host); - let res = match (&*host, 80).to_socket_addrs().map(|i| IpAddrs{ iter: i }) { - Ok(addrs) => (host, Ok(addrs)), - Err(e) => (host, Err(e)) - }; +pub struct Query(CpuFuture); - if let Err(_) = notify.send(res) { - break; - } - } - worker.shutdown = true; - }).expect("spawn dns thread"); +impl Future for Query { + type Item = IpAddrs; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + self.0.poll() + } } -struct Worker { - rx: Option>, - notify: Option>, - shutdown: bool, +pub struct IpAddrs { + iter: vec::IntoIter, } -impl Worker { - fn new(rx: spmc::Receiver, notify: channel::Sender) -> Worker { - Worker { - rx: Some(rx), - notify: Some(notify), - shutdown: false, - } +impl Iterator for IpAddrs { + type Item = SocketAddr; + #[inline] + fn next(&mut self) -> Option { + self.iter.next() } } -impl Drop for Worker { - fn drop(&mut self) { - if !self.shutdown { - trace!("Worker.drop panicked, restarting"); - work(self.rx.take().expect("Worker lost rx"), - self.notify.take().expect("Worker lost notify")); - } else { - trace!("Worker.drop shutdown, closing"); - } - } +pub type Answer = io::Result; + +fn work(hostname: String, port: u16) -> Answer { + debug!("resolve {:?}:{:?}", hostname, port); + (&*hostname, port).to_socket_addrs().map(|i| IpAddrs { iter: i }) } diff --git a/src/client/mod.rs b/src/client/mod.rs index a779c29dee..80fa6006d4 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -3,51 +3,49 @@ //! The HTTP `Client` uses asynchronous IO, and utilizes the `Handler` trait //! to convey when IO events are available for a given request. -use std::collections::{VecDeque, HashMap}; +use std::cell::RefCell; use std::fmt; use std::io; -use std::marker::PhantomData; -use std::sync::mpsc; -use std::thread; +use std::rc::Rc; use std::time::Duration; -use rotor::{self, Scope, EventSet, PollOpt}; - -use header::Host; -use http::{self, Next, RequestHead, ReadyResult}; -use net::Transport; +use futures::{Poll, Async, Future}; +use relay; +use tokio::io::Io; +use tokio::reactor::Handle; +use tokio_proto::BindClient; +use tokio_proto::streaming::Message; +use tokio_proto::streaming::pipeline::ClientProto; +use tokio_proto::util::client_proxy::ClientProxy; +pub use tokio_service::Service; + +use header::{Headers, Host}; +use http::{self, TokioBody}; +use method::Method; +use self::pool::{Pool, Pooled}; use uri::RequestUri; use {Url}; -pub use self::connect::{Connect, DefaultConnector, HttpConnector, HttpsConnector, DefaultTransport}; +pub use self::connect::{HttpConnector, Connect}; pub use self::request::Request; pub use self::response::Response; mod connect; mod dns; +mod pool; mod request; mod response; /// A Client to make outgoing HTTP requests. -pub struct Client { - tx: http::channel::Sender>, -} - -impl Clone for Client { - fn clone(&self) -> Client { - Client { - tx: self.tx.clone() - } - } -} - -impl fmt::Debug for Client { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("Client") - } +// If the Connector is clone, then the Client can be clone easily. +#[derive(Clone)] +pub struct Client { + connector: C, + handle: Handle, + pool: Pool, } -impl Client { +impl Client { /// Configure a Client. /// /// # Example @@ -56,116 +54,218 @@ impl Client { /// # use hyper::Client; /// let client = Client::configure() /// .keep_alive(true) - /// .max_sockets(10_000) /// .build().unwrap(); /// ``` #[inline] - pub fn configure() -> Config { + pub fn configure() -> Config { Config::default() } } -impl::Output>> Client { +impl Client { /// Create a new Client with the default config. #[inline] - pub fn new() -> ::Result> { - Client::::configure().build() + pub fn new(handle: &Handle) -> Client { + Client::configure().build(handle) } } -impl Client { +impl Client { /// Create a new client with a specific connector. - fn configured(config: Config) -> ::Result> - where H: Handler, - T: Transport, - C: Connect + Send + 'static { - let mut rotor_config = rotor::Config::new(); - rotor_config.slab_capacity(config.max_sockets); - rotor_config.mio().notify_capacity(config.max_sockets); - let keep_alive = config.keep_alive; - let connect_timeout = config.connect_timeout; - let mut loop_ = try!(rotor::Loop::new(&rotor_config)); - let mut notifier = None; - let mut connector = config.connector; - connector.dns_workers(config.dns_workers); - { - let not = &mut notifier; - loop_.add_machine_with(move |scope| { - let (tx, rx) = http::channel::new(scope.notifier()); - let (dns_tx, dns_rx) = http::channel::share(&tx); - *not = Some(tx); - connector.register(Registration { - notify: (dns_tx, dns_rx), - }); - rotor::Response::ok(ClientFsm::Connector(connector, rx)) - }).unwrap(); + #[inline] + fn configured(config: Config, handle: &Handle) -> Client { + Client { + connector: config.connector, + handle: handle.clone(), + pool: Pool::new(config.keep_alive, config.keep_alive_timeout), } + } - let notifier = notifier.expect("loop.add_machine_with failed"); - let _handle = try!(thread::Builder::new().name("hyper-client".to_owned()).spawn(move || { - loop_.run(Context { - connect_timeout: connect_timeout, - keep_alive: keep_alive, - idle_conns: HashMap::new(), - queue: HashMap::new(), - awaiting_slot: VecDeque::new(), - }).unwrap() - })); - - Ok(Client { - //handle: Some(handle), - tx: notifier, - }) + /// Send a GET Request using this Client. + #[inline] + pub fn get(&self, url: Url) -> FutureResponse { + self.request(Request::new(Method::Get, url)) } - /// Build a new request using this Client. - /// - /// ## Error - /// - /// If the event loop thread has died, or the queue is full, a `ClientError` - /// will be returned. - pub fn request(&self, url: Url, handler: H) -> Result<(), ClientError> { - self.tx.send(Notify::Connect(url, handler)).map_err(|e| { - match e.0 { - Some(Notify::Connect(url, handler)) => ClientError(Some((url, handler))), - _ => ClientError(None) + /// Send a constructed Request using this Client. + #[inline] + pub fn request(&self, req: Request) -> FutureResponse { + self.call(req) + } +} + +/// A `Future` that will resolve to an HTTP Response. +pub struct FutureResponse(Box + 'static>); + +impl fmt::Debug for FutureResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Future") + } +} + +impl Future for FutureResponse { + type Item = Response; + type Error = ::Error; + + fn poll(&mut self) -> Poll { + self.0.poll() + } +} + +impl Service for Client { + type Request = Request; + type Response = Response; + type Error = ::Error; + type Future = FutureResponse; + + fn call(&self, req: Request) -> Self::Future { + let url = req.url().clone(); + + let (mut head, body) = request::split(req); + let mut headers = Headers::new(); + headers.set(Host { + hostname: url.host_str().unwrap().to_owned(), + port: url.port().or(None), + }); + headers.extend(head.headers.iter()); + head.subject.1 = RequestUri::AbsolutePath { + path: url.path().to_owned(), + query: url.query().map(ToOwned::to_owned), + }; + head.headers = headers; + + let checkout = self.pool.checkout(&url[..::url::Position::BeforePath]); + let connect = { + let handle = self.handle.clone(); + let pool = self.pool.clone(); + let pool_key = Rc::new(url[..::url::Position::BeforePath].to_owned()); + self.connector.connect(url) + .map(move |io| { + let (tx, rx) = relay::channel(); + let client = HttpClient { + client_rx: RefCell::new(Some(rx)), + }.bind_client(&handle, io); + let pooled = pool.pooled(pool_key, client); + tx.complete(pooled.clone()); + pooled + }) + }; + + let race = checkout.select(connect) + .map(|(client, _work)| client) + .map_err(|(e, _work)| { + // the Pool Checkout cannot error, so the only error + // is from the Connector + // XXX: should wait on the Checkout? Problem is + // that if the connector is failing, it may be that we + // never had a pooled stream at all + e.into() + }); + let req = race.and_then(move |client| { + let msg = match body { + Some(body) => { + Message::WithBody(head, body.into()) + }, + None => Message::WithoutBody(head), + }; + client.call(msg) + }); + FutureResponse(Box::new(req.map(|msg| { + match msg { + Message::WithoutBody(head) => response::new(head, None), + Message::WithBody(head, body) => response::new(head, Some(body.into())), } - }) + }))) + } + +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("Client") + } +} + +type TokioClient = ClientProxy, Message, ::Error>; + +struct HttpClient { + client_rx: RefCell>>>, +} + +impl ClientProto for HttpClient { + type Request = http::RequestHead; + type RequestBody = http::Chunk; + type Response = http::ResponseHead; + type ResponseBody = http::Chunk; + type Error = ::Error; + type Transport = http::Conn>; + type BindTransport = BindingClient; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + BindingClient { + rx: self.client_rx.borrow_mut().take().expect("client_rx was lost"), + io: Some(io), + } } +} - /// Close the Client loop. - pub fn close(self) { - // Most errors mean that the Receivers are already dead, which would - // imply the EventLoop panicked. - let _ = self.tx.send(Notify::Shutdown); +struct BindingClient { + rx: relay::Receiver>, + io: Option, +} + +impl Future for BindingClient { + type Item = http::Conn>; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + match self.rx.poll() { + Ok(Async::Ready(client)) => Ok(Async::Ready( + http::Conn::new(self.io.take().expect("binding client io lost"), client) + )), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_canceled) => unreachable!(), + } } } /// Configuration for a Client #[derive(Debug, Clone)] pub struct Config { - connect_timeout: Duration, + //connect_timeout: Duration, connector: C, keep_alive: bool, keep_alive_timeout: Option, //TODO: make use of max_idle config max_idle: usize, - max_sockets: usize, - dns_workers: usize, } -impl Config where C: Connect + Send + 'static { +/// Phantom type used to signal that `Config` should create a `HttpConnector`. +#[derive(Debug, Clone, Copy)] +pub struct UseDefaultConnector(()); + +impl Config { + fn default() -> Config { + Config { + //connect_timeout: Duration::from_secs(10), + connector: UseDefaultConnector(()), + keep_alive: true, + keep_alive_timeout: Some(Duration::from_secs(90)), + max_idle: 5, + } + } +} + +impl Config { /// Set the `Connect` type to be used. #[inline] pub fn connector(self, val: CC) -> Config { Config { - connect_timeout: self.connect_timeout, + //connect_timeout: self.connect_timeout, connector: val, keep_alive: self.keep_alive, - keep_alive_timeout: Some(Duration::from_secs(60 * 2)), + keep_alive_timeout: self.keep_alive_timeout, max_idle: self.max_idle, - max_sockets: self.max_sockets, - dns_workers: self.dns_workers, } } @@ -189,15 +289,7 @@ impl Config where C: Connect + Send + 'static { self } - /// Set the max table size allocated for holding on to live sockets. - /// - /// Default is 1024. - #[inline] - pub fn max_sockets(mut self, val: usize) -> Config { - self.max_sockets = val; - self - } - + /* /// Set the timeout for connecting to a URL. /// /// Default is 10 seconds. @@ -206,584 +298,25 @@ impl Config where C: Connect + Send + 'static { self.connect_timeout = val; self } + */ +} - /// Set number of Dns workers to use for this client - /// - /// Default is 4 - #[inline] - pub fn dns_workers(mut self, workers: usize) -> Config { - self.dns_workers = workers; - self - } - +impl Config { /// Construct the Client with this configuration. #[inline] - pub fn build>(self) -> ::Result> { - Client::configured(self) - } -} - -impl Default for Config { - fn default() -> Config { - Config { - connect_timeout: Duration::from_secs(10), - connector: DefaultConnector::default(), - keep_alive: true, - keep_alive_timeout: Some(Duration::from_secs(60 * 2)), - max_idle: 5, - max_sockets: 1024, - dns_workers: 4, - } - } -} - -/// An error that can occur when trying to queue a request. -#[derive(Debug)] -pub struct ClientError(Option<(Url, H)>); - -impl ClientError { - /// If the event loop was down, the `Url` and `Handler` can be recovered - /// from this method. - pub fn recover(self) -> Option<(Url, H)> { - self.0 - } -} - -impl ::std::error::Error for ClientError { - fn description(&self) -> &str { - "Cannot queue request" - } -} - -impl fmt::Display for ClientError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Cannot queue request") - } -} - -/// A trait to react to client events that happen for each message. -/// -/// Each event handler returns it's desired `Next` action. -pub trait Handler: Send + 'static { - /// This event occurs first, triggering when a `Request` head can be written.. - fn on_request(&mut self, request: &mut Request) -> http::Next; - /// This event occurs each time the `Request` is ready to be written to. - fn on_request_writable(&mut self, request: &mut http::Encoder) -> http::Next; - /// This event occurs after the first time this handler signals `Next::read()`, - /// and a Response has been parsed. - fn on_response(&mut self, response: Response) -> http::Next; - /// This event occurs each time the `Response` is ready to be read from. - fn on_response_readable(&mut self, response: &mut http::Decoder) -> http::Next; - - /// This event occurs whenever an `Error` occurs outside of the other events. - /// - /// This could IO errors while waiting for events, or a timeout, etc. - fn on_error(&mut self, err: ::Error) -> http::Next { - debug!("default Handler.on_error({:?})", err); - http::Next::remove() - } - - /// This event occurs when this Handler has requested to remove the Transport. - fn on_remove(self, _transport: T) where Self: Sized { - debug!("default Handler.on_remove"); - } - - /// Receive a `Control` to manage waiting for this request. - fn on_control(&mut self, _: http::Control) { - debug!("default Handler.on_control()"); - } -} - -struct Message, T: Transport> { - handler: H, - url: Option, - _marker: PhantomData, -} - -impl, T: Transport> http::MessageHandler for Message { - type Message = http::ClientMessage; - - fn on_outgoing(&mut self, head: &mut RequestHead) -> Next { - let url = self.url.take().expect("Message.url is missing"); - if let Some(host) = url.host_str() { - head.headers.set(Host { - hostname: host.to_owned(), - port: url.port(), - }); - } - head.subject.1 = RequestUri::AbsolutePath { - path: url.path().to_owned(), - query: url.query().map(|q| q.to_owned()), - }; - let mut req = self::request::new(head); - self.handler.on_request(&mut req) - } - - fn on_encode(&mut self, transport: &mut http::Encoder) -> Next { - self.handler.on_request_writable(transport) - } - - fn on_incoming(&mut self, head: http::ResponseHead, _: &T) -> Next { - trace!("on_incoming {:?}", head); - let resp = response::new(head); - self.handler.on_response(resp) - } - - fn on_decode(&mut self, transport: &mut http::Decoder) -> Next { - self.handler.on_response_readable(transport) - } - - fn on_error(&mut self, error: ::Error) -> Next { - self.handler.on_error(error) - } - - fn on_remove(self, transport: T) { - self.handler.on_remove(transport); - } -} - -struct Context { - connect_timeout: Duration, - keep_alive: bool, - idle_conns: HashMap>, - queue: HashMap>>, - awaiting_slot: VecDeque<(C::Key, C::Output)>, -} - -/// Macro for advancing state of a ClientFsm::Socket -/// -/// This was previously a method on Context, but due to eviction needs, this -/// block now needs access to the registration APIs on rotor::Scope. -macro_rules! conn_response { - ($scope:expr, $conn:expr, $time:expr) => {{ - match $conn { - Some((conn, timeout)) => { - //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream - if conn.is_idle() { - $scope.idle_conns.entry(conn.key().clone()).or_insert_with(VecDeque::new) - .push_back(conn.control()); - } - match timeout { - Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) - .deadline($time + dur), - None => rotor::Response::ok(ClientFsm::Socket(conn)), - } - - } - None => { - if let Some((key, socket)) = $scope.awaiting_slot.pop_front() { - rotor_try!($scope.register(&socket, EventSet::writable() | EventSet::hup(), PollOpt::level())); - rotor::Response::ok(ClientFsm::Connecting((key, socket))) - } else { - rotor::Response::done() - } - } - } - }} -} - -impl Context { - fn pop_queue(&mut self, key: &K) -> Option> { - let mut should_remove = false; - let queued = { - self.queue.get_mut(key).and_then(|vec| { - let queued = vec.pop_front(); - if vec.is_empty() { - should_remove = true; - } - queued - }) - }; - if should_remove { - self.queue.remove(key); - } - - queued - } -} - -impl http::MessageHandlerFactory for Context - where K: http::Key, - H: Handler, - T: Transport, - C: Connect -{ - type Output = Message; - - fn create(&mut self, seed: http::Seed) -> Option { - let key = seed.key(); - self.pop_queue(key).map(|queued| { - let (url, mut handler) = (queued.url, queued.handler); - handler.on_control(seed.control()); - - Message { - handler: handler, - url: Some(url), - _marker: PhantomData, - } - }) + pub fn build(self, handle: &Handle) -> Client { + Client::configured(self, handle) } - - fn keep_alive_interest(&self) -> Next { - Next::wait() - } -} - -enum Notify { - Connect(Url, T), - Shutdown, -} - -enum ClientFsm -where C: Connect, - C::Output: Transport, - H: Handler { - Connector(C, http::channel::Receiver>), - Connecting((C::Key, C::Output)), - Socket(http::Conn>) } -unsafe impl Send for ClientFsm -where - C: Connect + Send, - //C::Key, // Key doesn't need to be Send - C::Output: Transport, // Tranport doesn't need to be Send - H: Handler + Send -{} - -impl rotor::Machine for ClientFsm -where C: Connect, - C::Key: fmt::Debug, - C::Output: Transport, - H: Handler { - type Context = Context; - type Seed = (C::Key, C::Output); - - fn create(seed: Self::Seed, scope: &mut Scope) -> rotor::Response { - rotor_try!(scope.register(&seed.1, EventSet::writable() | EventSet::hup(), PollOpt::level())); - rotor::Response::ok(ClientFsm::Connecting(seed)) - } - - fn ready(self, events: EventSet, scope: &mut Scope) -> rotor::Response { - match self { - ClientFsm::Socket(conn) => { - let mut conn = Some(conn); - loop { - match conn.take().unwrap().ready(events, scope) { - ReadyResult::Done(res) => { - let now = scope.now(); - return conn_response!(scope, res, now); - }, - ReadyResult::Continue(c) => conn = Some(c), - } - } - }, - ClientFsm::Connecting(mut seed) => { - if events.is_error() || events.is_hup() { - if let Some(err) = seed.1.take_socket_error().err() { - debug!("error while connecting: {:?}", err); - scope.pop_queue(&seed.0).map(move |mut queued| queued.handler.on_error(::Error::Io(err))); - } else { - trace!("connecting is_error, but no socket error"); - } - - rotor::Response::done() - } else if events.is_writable() { - if scope.queue.contains_key(&seed.0) { - trace!("connected and writable {:?}", seed.0); - rotor::Response::ok( - ClientFsm::Socket( - http::Conn::new( - seed.0, - seed.1, - Next::write().timeout(scope.connect_timeout), - scope.notifier(), - scope.now() - ).keep_alive(scope.keep_alive) - ) - ) - } else { - trace!("connected, but queued handler is gone: {:?}", seed.0); // probably took too long connecting - rotor::Response::done() - } - } else { - // spurious? - rotor::Response::ok(ClientFsm::Connecting(seed)) - } - } - ClientFsm::Connector(..) => { - unreachable!("Connector can never be ready") - }, - } - } - - fn spawned(self, scope: &mut Scope) -> rotor::Response { - match self { - ClientFsm::Connector(..) => self.connect(scope), - other => rotor::Response::ok(other) - } - } - - fn spawn_error( - self, - scope: &mut Scope, - error: rotor::SpawnError - ) -> rotor::Response { - // see if there's an idle connections that can be terminated. If yes, put this seed on a - // list waiting for empty slot. - if let rotor::SpawnError::NoSlabSpace((key, socket)) = error { - if let Some(mut queued) = scope.pop_queue(&key) { - trace!("attempting to remove an idle socket"); - // Remove an idle connection. Any connection. Just make some space - // for the new request. - let mut remove_keys = Vec::new(); - let mut found_idle = false; - - // Check all idle connections regardless of origin - for (key, idle) in scope.idle_conns.iter_mut() { - // Pop from the front since those are lease recently used - while let Some(ctrl) = idle.pop_front() { - // Signal connection to close. An err here means the - // socket is already dead can should be tossed. - if ctrl.ready(Next::remove()).is_ok() { - found_idle = true; - break; - } - } - - // This list is empty, mark it for removal - if idle.is_empty() { - remove_keys.push(key.to_owned()); - } - - // if found, stop looking for an idle connection. - if found_idle { - break; - } - } - - trace!("idle conns: {:?}", scope.idle_conns); - - // Remove empty idle lists. - for key in &remove_keys { - scope.idle_conns.remove(&key); - } - - if found_idle { - // A socket should be evicted soon; put it on a queue to - // consume newly freed slot. Also need to put the Queued - // back onto front of queue. - scope.awaiting_slot.push_back((key.clone(), socket)); - scope.queue - .entry(key) - .or_insert_with(VecDeque::new) - .push_back(queued); - } else { - // Couldn't evict a socket, just run the error handler. - debug!("Error spawning state machine; slab full and no sockets idle"); - let _ = queued.handler.on_error(::Error::Full); - } - } - } - - self.connect(scope) - } - - fn timeout(self, scope: &mut Scope) -> rotor::Response { - trace!("timeout now = {:?}", scope.now()); - match self { - ClientFsm::Connector(..) => { - let now = scope.now(); - let mut empty_keys = Vec::new(); - { - for (key, mut vec) in &mut scope.queue { - while !vec.is_empty() && vec[0].deadline <= now { - vec.pop_front() - .map(|mut queued| queued.handler.on_error(::Error::Timeout)); - } - if vec.is_empty() { - empty_keys.push(key.clone()); - } - } - } - for key in &empty_keys { - scope.queue.remove(key); - } - match self.deadline(scope) { - Some(deadline) => { - rotor::Response::ok(self).deadline(deadline) - }, - None => rotor::Response::ok(self) - } - } - ClientFsm::Connecting(..) => unreachable!(), - ClientFsm::Socket(conn) => { - let res = conn.timeout(scope); - let now = scope.now(); - conn_response!(scope, res, now) - } - } - } - - fn wakeup(self, scope: &mut Scope) -> rotor::Response { - match self { - ClientFsm::Connector(..) => { - self.connect(scope) - }, - ClientFsm::Socket(conn) => { - let res = conn.wakeup(scope); - let now = scope.now(); - conn_response!(scope, res, now) - }, - ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up") - } - } -} - -impl ClientFsm -where C: Connect, - C::Key: fmt::Debug, - C::Output: Transport, - H: Handler { - fn connect(self, scope: &mut rotor::Scope<::Context>) -> rotor::Response::Seed> { - match self { - ClientFsm::Connector(mut connector, rx) => { - if let Some((key, res)) = connector.connected() { - match res { - Ok(socket) => { - trace!("connecting {:?}", key); - return rotor::Response::spawn(ClientFsm::Connector(connector, rx), (key, socket)); - }, - Err(e) => { - trace!("connect error = {:?}", e); - scope.pop_queue(&key).map(|mut queued| queued.handler.on_error(::Error::Io(e))); - } - } - } - loop { - match rx.try_recv() { - Ok(Notify::Connect(url, mut handler)) => { - // check pool for sockets to this domain - if let Some(key) = connector.key(&url) { - let mut remove_idle = false; - let mut woke_up = false; - if let Some(mut idle) = scope.idle_conns.get_mut(&key) { - // Pop from back since those are most recently used. Connections - // at the front are allowed to expire. - while let Some(ctrl) = idle.pop_back() { - // err means the socket has since died - if ctrl.ready(Next::write()).is_ok() { - woke_up = true; - break; - } - } - remove_idle = idle.is_empty(); - } - if remove_idle { - scope.idle_conns.remove(&key); - } - - if woke_up { - trace!("woke up idle conn for '{}'", url); - let deadline = scope.now() + scope.connect_timeout; - scope.queue - .entry(key) - .or_insert_with(VecDeque::new) - .push_back(Queued { - deadline: deadline, - handler: handler, - url: url - }); - continue; - } - } else { - // this connector cannot handle this url anyways - let _ = handler.on_error(io::Error::new(io::ErrorKind::InvalidInput, "invalid url for connector").into()); - continue; - } - // no exist connection, call connector - match connector.connect(&url) { - Ok(key) => { - let deadline = scope.now() + scope.connect_timeout; - scope.queue - .entry(key) - .or_insert_with(VecDeque::new) - .push_back(Queued { - deadline: deadline, - handler: handler, - url: url - }); - } - Err(e) => { - let _todo = handler.on_error(e.into()); - trace!("Connect error, next={:?}", _todo); - continue; - } - } - } - Ok(Notify::Shutdown) => { - scope.shutdown_loop(); - return rotor::Response::done() - }, - Err(mpsc::TryRecvError::Disconnected) => { - // if there is no way to send additional requests, - // what more can the loop do? i suppose we should - // shutdown. - scope.shutdown_loop(); - return rotor::Response::done() - } - Err(mpsc::TryRecvError::Empty) => { - // spurious wakeup or loop is done - let fsm = ClientFsm::Connector(connector, rx); - return match fsm.deadline(scope) { - Some(deadline) => { - rotor::Response::ok(fsm).deadline(deadline) - }, - None => rotor::Response::ok(fsm) - }; - } - } - } - }, - other => rotor::Response::ok(other) - } - } - - fn deadline(&self, scope: &mut rotor::Scope<::Context>) -> Option { - match *self { - ClientFsm::Connector(..) => { - let mut earliest = None; - for vec in scope.queue.values() { - for queued in vec { - match earliest { - Some(ref mut earliest) => { - if queued.deadline < *earliest { - *earliest = queued.deadline; - } - } - None => earliest = Some(queued.deadline) - } - } - } - trace!("deadline = {:?}, now = {:?}", earliest, scope.now()); - earliest - } - _ => None - } +impl Config { + /// Construct the Client with this configuration. + #[inline] + pub fn build(self, handle: &Handle) -> Client { + self.connector(HttpConnector::new(4, handle)).build(handle) } } -struct Queued { - deadline: rotor::Time, - handler: H, - url: Url, -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct Registration { - notify: (http::channel::Sender, http::channel::Receiver), -} #[cfg(test)] mod tests { diff --git a/src/client/pool.rs b/src/client/pool.rs new file mode 100644 index 0000000000..cb7dc97e50 --- /dev/null +++ b/src/client/pool.rs @@ -0,0 +1,353 @@ +use std::cell::{Cell, RefCell}; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::io; +use std::ops::{Deref, DerefMut, BitAndAssign}; +use std::rc::Rc; +use std::time::{Duration, Instant}; + +use futures::{Future, Async, Poll}; +use relay; + +use http::{KeepAlive, KA}; + +pub struct Pool { + inner: Rc>>, +} + +struct PoolInner { + enabled: bool, + idle: HashMap, Vec>>, + parked: HashMap, VecDeque>>>, + timeout: Option, +} + +impl Pool { + pub fn new(enabled: bool, timeout: Option) -> Pool { + Pool { + inner: Rc::new(RefCell::new(PoolInner { + enabled: enabled, + idle: HashMap::new(), + parked: HashMap::new(), + timeout: timeout, + })), + } + } + + pub fn checkout(&self, key: &str) -> Checkout { + Checkout { + key: Rc::new(key.to_owned()), + pool: self.clone(), + parked: None, + } + } + + fn put(&mut self, key: Rc, entry: Entry) { + trace!("Pool::put {:?}", key); + let mut remove_parked = false; + let tx = self.inner.borrow_mut().parked.get_mut(&key).and_then(|parked| { + let mut ret = None; + while let Some(tx) = parked.pop_front() { + if !tx.is_canceled() { + ret = Some(tx); + break; + } + trace!("Pool::put removing canceled parked {:?}", key); + } + remove_parked = parked.is_empty(); + ret + }); + if remove_parked { + self.inner.borrow_mut().parked.remove(&key); + } + + if let Some(tx) = tx { + trace!("Pool::put found parked {:?}", key); + tx.complete(entry); + } else { + self.inner.borrow_mut() + .idle.entry(key) + .or_insert(Vec::new()) + .push(entry); + } + } + + pub fn pooled(&self, key: Rc, value: T) -> Pooled { + trace!("Pool::pooled {:?}", key); + Pooled { + entry: Entry { + value: value, + is_reused: false, + status: Rc::new(Cell::new(KA::Busy)), + }, + key: key, + pool: self.clone(), + } + } + + fn is_enabled(&self) -> bool { + self.inner.borrow().enabled + } + + fn reuse(&self, key: Rc, mut entry: Entry) -> Pooled { + trace!("Pool::reuse {:?}", key); + entry.is_reused = true; + entry.status.set(KA::Busy); + Pooled { + entry: entry, + key: key, + pool: self.clone(), + } + } + + fn park(&mut self, key: Rc, tx: relay::Sender>) { + trace!("Pool::park {:?}", key); + self.inner.borrow_mut() + .parked.entry(key) + .or_insert(VecDeque::new()) + .push_back(tx); + } +} + +impl Clone for Pool { + fn clone(&self) -> Pool { + Pool { + inner: self.inner.clone(), + } + } +} + +#[derive(Clone)] +pub struct Pooled { + entry: Entry, + key: Rc, + pool: Pool, +} + +impl Deref for Pooled { + type Target = T; + fn deref(&self) -> &T { + &self.entry.value + } +} + +impl DerefMut for Pooled { + fn deref_mut(&mut self) -> &mut T { + &mut self.entry.value + } +} + +impl KeepAlive for Pooled { + fn busy(&mut self) { + self.entry.status.set(KA::Busy); + } + + fn disable(&mut self) { + self.entry.status.set(KA::Disabled); + } + + fn idle(&mut self) { + let previous = self.status(); + self.entry.status.set(KA::Idle(Instant::now())); + if let KA::Idle(..) = previous { + trace!("Pooled::idle already idle"); + return; + } + self.entry.is_reused = true; + if self.pool.is_enabled() { + self.pool.put(self.key.clone(), self.entry.clone()); + } + } + + fn status(&self) -> KA { + self.entry.status.get() + } +} + +impl fmt::Debug for Pooled { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Pooled") + .field("status", &self.entry.status.get()) + .field("key", &self.key) + .finish() + } +} + +impl BitAndAssign for Pooled { + fn bitand_assign(&mut self, enabled: bool) { + if !enabled { + self.disable(); + } + } +} + +#[derive(Clone)] +struct Entry { + value: T, + is_reused: bool, + status: Rc>, +} + +pub struct Checkout { + key: Rc, + pool: Pool, + parked: Option>>, +} + +impl Future for Checkout { + type Item = Pooled; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + trace!("Checkout::poll"); + let mut drop_parked = false; + if let Some(ref mut rx) = self.parked { + match rx.poll() { + Ok(Async::Ready(entry)) => { + trace!("Checkout::poll found client in relay for {:?}", self.key); + return Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))); + }, + Ok(Async::NotReady) => (), + Err(_canceled) => drop_parked = true, + } + } + if drop_parked { + self.parked.take(); + } + let expiration = Expiration::new(self.pool.inner.borrow().timeout); + let key = &self.key; + trace!("Checkout::poll url = {:?}, expiration = {:?}", key, expiration.0); + let mut should_remove = false; + let entry = self.pool.inner.borrow_mut().idle.get_mut(key).and_then(|list| { + trace!("Checkout::poll key found {:?}", key); + while let Some(entry) = list.pop() { + match entry.status.get() { + KA::Idle(idle_at) if !expiration.expires(idle_at) => { + trace!("Checkout::poll found idle client for {:?}", key); + should_remove = list.is_empty(); + return Some(entry); + }, + _ => { + trace!("Checkout::poll removing unacceptable pooled {:?}", key); + // every other case the Entry should just be dropped + // 1. Idle but expired + // 2. Busy (something else somehow took it?) + // 3. Disabled don't reuse of course + } + } + } + should_remove = true; + None + }); + + if should_remove { + self.pool.inner.borrow_mut().idle.remove(key); + } + match entry { + Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))), + None => { + if self.parked.is_none() { + let (tx, mut rx) = relay::channel(); + let _ = rx.poll(); // park this task + self.pool.park(self.key.clone(), tx); + self.parked = Some(rx); + } + Ok(Async::NotReady) + }, + } + } +} + +struct Expiration(Option); + +impl Expiration { + fn new(dur: Option) -> Expiration { + Expiration(dur.map(|dur| Instant::now() - dur)) + } + + fn expires(&self, instant: Instant) -> bool { + match self.0 { + Some(expire) => expire > instant, + None => false, + } + } +} + + +#[cfg(test)] +mod tests { + use std::rc::Rc; + use std::time::Duration; + use futures::{Async, Future}; + use http::KeepAlive; + use super::Pool; + + #[test] + fn test_pool_checkout_smoke() { + let pool = Pool::new(true, Some(Duration::from_secs(5))); + let key = Rc::new("foo".to_string()); + let mut pooled = pool.pooled(key.clone(), 41); + pooled.idle(); + + match pool.checkout(&key).poll().unwrap() { + Async::Ready(pooled) => assert_eq!(*pooled, 41), + _ => panic!("not ready"), + } + } + + #[test] + fn test_pool_checkout_returns_none_if_expired() { + ::futures::lazy(|| { + let pool = Pool::new(true, Some(Duration::from_secs(1))); + let key = Rc::new("foo".to_string()); + let mut pooled = pool.pooled(key.clone(), 41); + pooled.idle(); + ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); + assert!(pool.checkout(&key).poll().unwrap().is_not_ready()); + ::futures::future::ok::<(), ()>(()) + }).wait().unwrap(); + } + + #[test] + fn test_pool_removes_expired() { + let pool = Pool::new(true, Some(Duration::from_secs(1))); + let key = Rc::new("foo".to_string()); + + let mut pooled1 = pool.pooled(key.clone(), 41); + pooled1.idle(); + let mut pooled2 = pool.pooled(key.clone(), 5); + pooled2.idle(); + let mut pooled3 = pool.pooled(key.clone(), 99); + pooled3.idle(); + + + assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); + ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); + + pooled1.idle(); + pooled2.idle(); // idle after sleep, not expired + pool.checkout(&key).poll().unwrap(); + assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(1)); + pool.checkout(&key).poll().unwrap(); + assert!(pool.inner.borrow().idle.get(&key).is_none()); + } + + #[test] + fn test_pool_checkout_task_unparked() { + let pool = Pool::new(true, Some(Duration::from_secs(10))); + let key = Rc::new("foo".to_string()); + let pooled1 = pool.pooled(key.clone(), 41); + + let mut pooled = pooled1.clone(); + let checkout = pool.checkout(&key).join(::futures::lazy(move || { + // the checkout future will park first, + // and then this lazy future will be polled, which will insert + // the pooled back into the pool + // + // this test makes sure that doing so will unpark the checkout + pooled.idle(); + Ok(()) + })).map(|(entry, _)| entry); + assert_eq!(*checkout.wait().unwrap(), *pooled1); + } +} diff --git a/src/client/request.rs b/src/client/request.rs index e494f867ab..d74ea9ee96 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -1,55 +1,90 @@ -//! Client Requests +use std::fmt; + +use Url; use header::Headers; -use http::RequestHead; +use http::{Body, RequestHead}; use method::Method; use uri::RequestUri; use version::HttpVersion; - - /// A client request to a remote server. -#[derive(Debug)] -pub struct Request<'a> { - head: &'a mut RequestHead +pub struct Request { + method: Method, + url: Url, + version: HttpVersion, + headers: Headers, + body: Option, } -impl<'a> Request<'a> { +impl Request { + /// Construct a new Request. + #[inline] + pub fn new(method: Method, url: Url) -> Request { + Request { + method: method, + url: url, + version: HttpVersion::default(), + headers: Headers::new(), + body: None, + } + } + /// Read the Request Url. #[inline] - pub fn uri(&self) -> &RequestUri { &self.head.subject.1 } + pub fn url(&self) -> &Url { &self.url } /// Readthe Request Version. #[inline] - pub fn version(&self) -> &HttpVersion { &self.head.version } + pub fn version(&self) -> &HttpVersion { &self.version } /// Read the Request headers. #[inline] - pub fn headers(&self) -> &Headers { &self.head.headers } + pub fn headers(&self) -> &Headers { &self.headers } /// Read the Request method. #[inline] - pub fn method(&self) -> &Method { &self.head.subject.0 } + pub fn method(&self) -> &Method { &self.method } /// Set the Method of this request. #[inline] - pub fn set_method(&mut self, method: Method) { self.head.subject.0 = method; } + pub fn set_method(&mut self, method: Method) { self.method = method; } /// Get a mutable reference to the Request headers. #[inline] - pub fn headers_mut(&mut self) -> &mut Headers { &mut self.head.headers } + pub fn headers_mut(&mut self) -> &mut Headers { &mut self.headers } - /// Set the `RequestUri` of this request. + /// Set the `Url` of this request. #[inline] - pub fn set_uri(&mut self, uri: RequestUri) { self.head.subject.1 = uri; } + pub fn set_url(&mut self, url: Url) { self.url = url; } /// Set the `HttpVersion` of this request. #[inline] - pub fn set_version(&mut self, version: HttpVersion) { self.head.version = version; } + pub fn set_version(&mut self, version: HttpVersion) { self.version = version; } + + /// Set the body of the request. + #[inline] + pub fn set_body>(&mut self, body: T) { self.body = Some(body.into()); } +} + +impl fmt::Debug for Request { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Request") + .field("method", &self.method) + .field("url", &self.url) + .field("version", &self.version) + .field("headers", &self.headers) + .finish() + } } -pub fn new(head: &mut RequestHead) -> Request { - Request { head: head } +pub fn split(req: Request) -> (RequestHead, Option) { + let head = RequestHead { + subject: ::http::RequestLine(req.method, RequestUri::AbsoluteUri(req.url)), + headers: req.headers, + version: req.version, + }; + (head, req.body) } #[cfg(test)] diff --git a/src/client/response.rs b/src/client/response.rs index bb20aa6d9e..8249dbddd2 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,11 +1,11 @@ -//! Client Responses +use std::fmt; + use header; -//use net::NetworkStream; -use http::{self, RawStatus}; +use http::{self, RawStatus, Body}; use status; use version; -pub fn new(incoming: http::ResponseHead) -> Response { +pub fn new(incoming: http::ResponseHead, body: Option) -> Response { trace!("Response::new"); let status = status::StatusCode::from_u16(incoming.subject.0); debug!("version={:?}, status={:?}", incoming.version, status); @@ -16,17 +16,18 @@ pub fn new(incoming: http::ResponseHead) -> Response { version: incoming.version, headers: incoming.headers, status_raw: incoming.subject, + body: body, } } /// A response for a client request to a remote server. -#[derive(Debug)] pub struct Response { status: status::StatusCode, headers: header::Headers, version: version::HttpVersion, status_raw: RawStatus, + body: Option, } impl Response { @@ -42,170 +43,23 @@ impl Response { #[inline] pub fn status_raw(&self) -> &RawStatus { &self.status_raw } - /// Get the final URL of this response. - #[inline] - //pub fn url(&self) -> &Url { &self.url } - /// Get the HTTP version of this response from the server. #[inline] pub fn version(&self) -> &version::HttpVersion { &self.version } -} -/* -impl Drop for Response { - fn drop(&mut self) { - // if not drained, theres old bits in the Reader. we can't reuse this, - // since those old bits would end up in new Responses - // - // otherwise, the response has been drained. we should check that the - // server has agreed to keep the connection open - let is_drained = !self.message.has_body(); - trace!("Response.drop is_drained={}", is_drained); - if !(is_drained && http::should_keep_alive(self.version, &self.headers)) { - trace!("Response.drop closing connection"); - if let Err(e) = self.message.close_connection() { - error!("Response.drop error closing connection: {}", e); - } - } + /// Take the `Body` of this response. + #[inline] + pub fn body(mut self) -> Body { + self.body.take().unwrap_or(Body::empty()) } } -*/ - -#[cfg(test)] -mod tests { - /* - use std::io::{self, Read}; - - use url::Url; - - use header::TransferEncoding; - use header::Encoding; - use http::HttpMessage; - use mock::MockStream; - use status; - use version; - use http::h1::Http11Message; - - use super::Response; - - fn read_to_string(mut r: Response) -> io::Result { - let mut s = String::new(); - try!(r.read_to_string(&mut s)); - Ok(s) - } - - - #[test] - fn test_into_inner() { - let message: Box = Box::new( - Http11Message::with_stream(Box::new(MockStream::new()))); - let message = message.downcast::().ok().unwrap(); - let b = message.into_inner().downcast::().ok().unwrap(); - assert_eq!(b, Box::new(MockStream::new())); - } - - #[test] - fn test_parse_chunked_response() { - let stream = MockStream::with_input(b"\ - HTTP/1.1 200 OK\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - 1\r\n\ - q\r\n\ - 2\r\n\ - we\r\n\ - 2\r\n\ - rt\r\n\ - 0\r\n\ - \r\n" - ); - - let url = Url::parse("http://hyper.rs").unwrap(); - let res = Response::new(url, Box::new(stream)).unwrap(); - - // The status line is correct? - assert_eq!(res.status, status::StatusCode::Ok); - assert_eq!(res.version, version::HttpVersion::Http11); - // The header is correct? - match res.headers.get::() { - Some(encodings) => { - assert_eq!(1, encodings.len()); - assert_eq!(Encoding::Chunked, encodings[0]); - }, - None => panic!("Transfer-Encoding: chunked expected!"), - }; - // The body is correct? - assert_eq!(read_to_string(res).unwrap(), "qwert".to_owned()); - } - - /// Tests that when a chunk size is not a valid radix-16 number, an error - /// is returned. - #[test] - fn test_invalid_chunk_size_not_hex_digit() { - let stream = MockStream::with_input(b"\ - HTTP/1.1 200 OK\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - X\r\n\ - 1\r\n\ - 0\r\n\ - \r\n" - ); - - let url = Url::parse("http://hyper.rs").unwrap(); - let res = Response::new(url, Box::new(stream)).unwrap(); - - assert!(read_to_string(res).is_err()); - } - - /// Tests that when a chunk size contains an invalid extension, an error is - /// returned. - #[test] - fn test_invalid_chunk_size_extension() { - let stream = MockStream::with_input(b"\ - HTTP/1.1 200 OK\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - 1 this is an invalid extension\r\n\ - 1\r\n\ - 0\r\n\ - \r\n" - ); - - let url = Url::parse("http://hyper.rs").unwrap(); - let res = Response::new(url, Box::new(stream)).unwrap(); - - assert!(read_to_string(res).is_err()); - } - - /// Tests that when a valid extension that contains a digit is appended to - /// the chunk size, the chunk is correctly read. - #[test] - fn test_chunk_size_with_extension() { - let stream = MockStream::with_input(b"\ - HTTP/1.1 200 OK\r\n\ - Transfer-Encoding: chunked\r\n\ - \r\n\ - 1;this is an extension with a digit 1\r\n\ - 1\r\n\ - 0\r\n\ - \r\n" - ); - - let url = Url::parse("http://hyper.rs").unwrap(); - let res = Response::new(url, Box::new(stream)).unwrap(); - - assert_eq!(read_to_string(res).unwrap(), "1".to_owned()); - } - - #[test] - fn test_parse_error_closes() { - let url = Url::parse("http://hyper.rs").unwrap(); - let stream = MockStream::with_input(b"\ - definitely not http - "); - assert!(Response::new(url, Box::new(stream)).is_err()); +impl fmt::Debug for Response { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Response") + .field("status", &self.status) + .field("version", &self.version) + .field("headers", &self.headers) + .finish() } - */ } diff --git a/src/error.rs b/src/error.rs index 77474101d1..463a6f94f0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -8,9 +8,6 @@ use std::string::FromUtf8Error; use httparse; use url; -#[cfg(feature = "openssl")] -use openssl::ssl::error::SslError; - use self::Error::{ Method, Uri, @@ -19,7 +16,6 @@ use self::Error::{ Status, Timeout, Io, - Ssl, TooLarge, Incomplete, Utf8 @@ -49,12 +45,8 @@ pub enum Error { Status, /// A timeout occurred waiting for an IO event. Timeout, - /// Event loop is full and cannot process request - Full, /// An `io::Error` that occurred while trying to read or write to a network stream. Io(IoError), - /// An error from a SSL library. - Ssl(Box), /// Parsing a field as string failed Utf8(Utf8Error), @@ -76,7 +68,6 @@ impl fmt::Display for Error { match *self { Uri(ref e) => fmt::Display::fmt(e, f), Io(ref e) => fmt::Display::fmt(e, f), - Ssl(ref e) => fmt::Display::fmt(e, f), Utf8(ref e) => fmt::Display::fmt(e, f), ref e => f.write_str(e.description()), } @@ -93,10 +84,8 @@ impl StdError for Error { Status => "Invalid Status provided", Incomplete => "Message is incomplete", Timeout => "Timeout", - Error::Full => "Event loop is full", Uri(ref e) => e.description(), Io(ref e) => e.description(), - Ssl(ref e) => e.description(), Utf8(ref e) => e.description(), Error::__Nonexhaustive(ref void) => match *void {} } @@ -105,8 +94,9 @@ impl StdError for Error { fn cause(&self) -> Option<&StdError> { match *self { Io(ref error) => Some(error), - Ssl(ref error) => Some(&**error), Uri(ref error) => Some(error), + Utf8(ref error) => Some(error), + Error::__Nonexhaustive(ref void) => match *void {}, _ => None, } } @@ -124,16 +114,6 @@ impl From for Error { } } -#[cfg(feature = "openssl")] -impl From for Error { - fn from(err: SslError) -> Error { - match err { - SslError::StreamError(err) => Io(err), - err => Ssl(Box::new(err)), - } - } -} - impl From for Error { fn from(err: Utf8Error) -> Error { Utf8(err) @@ -181,9 +161,9 @@ mod tests { ($from:expr => $error:pat) => { match Error::from($from) { e @ $error => { - assert!(e.description().len() > 5); + assert!(e.description().len() >= 5); } , - _ => panic!("{:?}", $from) + e => panic!("{:?}", e) } } } diff --git a/src/header/common/access_control_allow_methods.rs b/src/header/common/access_control_allow_methods.rs index 7917e1992d..83e88ae3f8 100644 --- a/src/header/common/access_control_allow_methods.rs +++ b/src/header/common/access_control_allow_methods.rs @@ -19,7 +19,7 @@ header! { /// # Examples /// ``` /// use hyper::header::{Headers, AccessControlAllowMethods}; - /// use hyper::method::Method; + /// use hyper::Method; /// /// let mut headers = Headers::new(); /// headers.set( @@ -28,7 +28,7 @@ header! { /// ``` /// ``` /// use hyper::header::{Headers, AccessControlAllowMethods}; - /// use hyper::method::Method; + /// use hyper::Method; /// /// let mut headers = Headers::new(); /// headers.set( diff --git a/src/header/common/access_control_request_method.rs b/src/header/common/access_control_request_method.rs index afeb4e296f..ec0b99275d 100644 --- a/src/header/common/access_control_request_method.rs +++ b/src/header/common/access_control_request_method.rs @@ -17,7 +17,7 @@ header! { /// # Examples /// ``` /// use hyper::header::{Headers, AccessControlRequestMethod}; - /// use hyper::method::Method; + /// use hyper::Method; /// /// let mut headers = Headers::new(); /// headers.set(AccessControlRequestMethod(Method::Get)); diff --git a/src/header/common/allow.rs b/src/header/common/allow.rs index a43220a6e8..5e76d98b41 100644 --- a/src/header/common/allow.rs +++ b/src/header/common/allow.rs @@ -21,7 +21,7 @@ header! { /// # Examples /// ``` /// use hyper::header::{Headers, Allow}; - /// use hyper::method::Method; + /// use hyper::Method; /// /// let mut headers = Headers::new(); /// headers.set( @@ -30,7 +30,7 @@ header! { /// ``` /// ``` /// use hyper::header::{Headers, Allow}; - /// use hyper::method::Method; + /// use hyper::Method; /// /// let mut headers = Headers::new(); /// headers.set( diff --git a/src/header/common/content_length.rs b/src/header/common/content_length.rs index 3d7328518a..dd2536b1f6 100644 --- a/src/header/common/content_length.rs +++ b/src/header/common/content_length.rs @@ -72,7 +72,6 @@ impl fmt::Display for ContentLength { } __hyper__deref!(ContentLength => u64); -__hyper_generate_header_serialization!(ContentLength); __hyper__tm!(ContentLength, tests { // Testcase from RFC diff --git a/src/header/common/mod.rs b/src/header/common/mod.rs index 6a886fde23..71eab162cc 100644 --- a/src/header/common/mod.rs +++ b/src/header/common/mod.rs @@ -182,31 +182,6 @@ macro_rules! test_header { } } -#[doc(hidden)] -#[macro_export] -macro_rules! __hyper_generate_header_serialization { - ($id:ident) => { - #[cfg(feature = "serde-serialization")] - impl ::serde::Serialize for $id { - fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> - where S: ::serde::Serializer { - format!("{}", self).serialize(serializer) - } - } - - #[cfg(feature = "serde-serialization")] - impl ::serde::Deserialize for $id { - fn deserialize(deserializer: &mut D) -> Result<$id, D::Error> - where D: ::serde::Deserializer { - let string_representation: String = - try!(::serde::Deserialize::deserialize(deserializer)); - let raw = string_representation.into_bytes().into(); - Ok($crate::header::Header::parse_header(&raw).unwrap()) - } - } - } -} - #[macro_export] macro_rules! header { // $a:meta: Attributes associated with the header item (usually docs) @@ -238,8 +213,6 @@ macro_rules! header { self.fmt_header(f) } } - - __hyper_generate_header_serialization!($id); }; // List header, one or more items ($(#[$a:meta])*($id:ident, $n:expr) => ($item:ty)+) => { @@ -265,7 +238,6 @@ macro_rules! header { self.fmt_header(f) } } - __hyper_generate_header_serialization!($id); }; // Single value header ($(#[$a:meta])*($id:ident, $n:expr) => [$value:ty]) => { @@ -290,7 +262,6 @@ macro_rules! header { ::std::fmt::Display::fmt(&**self, f) } } - __hyper_generate_header_serialization!($id); }; // List header, one or more items with "*" option ($(#[$a:meta])*($id:ident, $n:expr) => {Any / ($item:ty)+}) => { @@ -330,7 +301,6 @@ macro_rules! header { self.fmt_header(f) } } - __hyper_generate_header_serialization!($id); }; // optional test module @@ -421,4 +391,4 @@ mod transfer_encoding; mod upgrade; mod user_agent; mod vary; -mod warning; \ No newline at end of file +mod warning; diff --git a/src/header/mod.rs b/src/header/mod.rs index f5929ccfde..7c18af7ca9 100644 --- a/src/header/mod.rs +++ b/src/header/mod.rs @@ -85,11 +85,6 @@ use unicase::UniCase; use self::internals::{Item, VecMap, Entry}; -#[cfg(feature = "serde-serialization")] -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -#[cfg(feature = "serde-serialization")] -use serde::de; - pub use self::shared::*; pub use self::common::*; pub use self::raw::Raw; @@ -437,44 +432,6 @@ impl fmt::Debug for Headers { } } -#[cfg(feature = "serde-serialization")] -impl Serialize for Headers { - fn serialize(&self, serializer: &mut S) -> Result<(), S::Error> - where S: Serializer - { - let mut state = try!(serializer.serialize_map(Some(self.len()))); - for header in self.iter() { - try!(serializer.serialize_map_key(&mut state, header.name())); - try!(serializer.serialize_map_value(&mut state, header.value_string())); - } - serializer.serialize_map_end(state) - } -} - -#[cfg(feature = "serde-serialization")] -impl Deserialize for Headers { - fn deserialize(deserializer: &mut D) -> Result where D: Deserializer { - struct HeadersVisitor; - - impl de::Visitor for HeadersVisitor { - type Value = Headers; - - fn visit_map(&mut self, mut visitor: V) -> Result - where V: de::MapVisitor { - let mut result = Headers::new(); - while let Some((key, value)) = try!(visitor.visit()) { - let (key, value): (String, String) = (key, value); - result.set_raw(key, vec![value.into_bytes()]); - } - try!(visitor.end()); - Ok(result) - } - } - - deserializer.deserialize_map(HeadersVisitor) - } -} - /// An `Iterator` over the fields in a `Headers` map. #[allow(missing_debug_implementations)] pub struct HeadersItems<'a> { diff --git a/src/http/body.rs b/src/http/body.rs new file mode 100644 index 0000000000..b8ade3d76b --- /dev/null +++ b/src/http/body.rs @@ -0,0 +1,97 @@ +use std::convert::From; +use std::sync::Arc; + +use tokio_proto; +use http::Chunk; +use futures::{Poll, Stream}; +use futures::sync::mpsc; + +pub type TokioBody = tokio_proto::streaming::Body; + +/// A `Stream` for `Chunk`s used in requests and responses. +#[derive(Debug)] +pub struct Body(TokioBody); + +impl Body { + /// Return an empty body stream + pub fn empty() -> Body { + Body(TokioBody::empty()) + } + + /// Return a body stream with an associated sender half + pub fn pair() -> (mpsc::Sender>, Body) { + let (tx, rx) = TokioBody::pair(); + let rx = Body(rx); + (tx, rx) + } +} + +impl Stream for Body { + type Item = Chunk; + type Error = ::Error; + + fn poll(&mut self) -> Poll, ::Error> { + self.0.poll() + } +} + +impl From for tokio_proto::streaming::Body { + fn from(b: Body) -> tokio_proto::streaming::Body { + b.0 + } +} + +impl From> for Body { + fn from(tokio_body: tokio_proto::streaming::Body) -> Body { + Body(tokio_body) + } +} + +impl From>> for Body { + fn from(src: mpsc::Receiver>) -> Body { + Body(src.into()) + } +} + +impl From for Body { + fn from (chunk: Chunk) -> Body { + Body(TokioBody::from(chunk)) + } +} + +impl From> for Body { + fn from (vec: Vec) -> Body { + Body(TokioBody::from(Chunk::from(vec))) + } +} + +impl From>> for Body { + fn from (vec: Arc>) -> Body { + Body(TokioBody::from(Chunk::from(vec))) + } +} + +impl From<&'static [u8]> for Body { + fn from (slice: &'static [u8]) -> Body { + Body(TokioBody::from(Chunk::from(slice))) + } +} + +impl From for Body { + fn from (s: String) -> Body { + Body(TokioBody::from(Chunk::from(s.into_bytes()))) + } +} + +impl From<&'static str> for Body { + fn from (slice: &'static str) -> Body { + Body(TokioBody::from(Chunk::from(slice.as_bytes()))) + } +} + +fn _assert_send() { + fn _assert() {} + + _assert::(); + _assert::(); +} diff --git a/src/http/buffer.rs b/src/http/buffer.rs index 82fb93f034..3c8ef504f8 100644 --- a/src/http/buffer.rs +++ b/src/http/buffer.rs @@ -1,16 +1,16 @@ use std::cmp; -use std::io::{self, Read}; +use std::io::{self, Read, Write}; use std::ptr; const INIT_BUFFER_SIZE: usize = 4096; -const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; +pub const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; #[derive(Debug, Default)] pub struct Buffer { vec: Vec, - read_pos: usize, - write_pos: usize, + tail: usize, + head: usize, } impl Buffer { @@ -24,7 +24,17 @@ impl Buffer { #[inline] pub fn len(&self) -> usize { - self.read_pos - self.write_pos + self.tail - self.head + } + + #[inline] + fn available(&self) -> usize { + self.vec.len() - self.tail + } + + #[inline] + pub fn is_max_size(&self) -> bool { + self.len() >= MAX_BUFFER_SIZE } #[inline] @@ -34,45 +44,88 @@ impl Buffer { #[inline] pub fn bytes(&self) -> &[u8] { - &self.vec[self.write_pos..self.read_pos] + &self.vec[self.head..self.tail] } #[inline] pub fn consume(&mut self, pos: usize) { - debug_assert!(self.read_pos >= self.write_pos + pos); - self.write_pos += pos; - if self.write_pos == self.read_pos { - self.write_pos = 0; - self.read_pos = 0; + debug_assert!(self.tail >= self.head + pos); + self.head += pos; + if self.head == self.tail { + self.head = 0; + self.tail = 0; + } + } + + pub fn consume_leading_lines(&mut self) { + while !self.is_empty() { + match self.vec[self.head] { + b'\r' | b'\n' => { + self.consume(1); + }, + _ => return + } } } pub fn read_from(&mut self, r: &mut R) -> io::Result { - self.maybe_reserve(); - let n = try!(r.read(&mut self.vec[self.read_pos..])); - self.read_pos += n; + self.maybe_reserve(1); + let n = try!(r.read(&mut self.vec[self.tail..])); + self.tail += n; + self.maybe_reset(); Ok(n) } + pub fn write_into(&mut self, w: &mut W) -> io::Result { + if self.is_empty() { + Ok(0) + } else { + let n = try!(w.write(&mut self.vec[self.head..self.tail])); + self.head += n; + self.maybe_reset(); + Ok(n) + } + } + + pub fn write(&mut self, data: &[u8]) -> usize { + trace!("Buffer::write len = {:?}", data.len()); + self.maybe_reserve(data.len()); + let len = cmp::min(self.available(), data.len()); + assert!(self.available() >= len); + unsafe { + // in rust 1.9, we could use slice::copy_from_slice + ptr::copy( + data.as_ptr(), + self.vec.as_mut_ptr().offset(self.tail as isize), + len + ); + } + self.tail += len; + len + } + #[inline] - fn maybe_reserve(&mut self) { + fn maybe_reserve(&mut self, needed: usize) { let cap = self.vec.len(); if cap == 0 { - trace!("reserving initial {}", INIT_BUFFER_SIZE); - self.vec = vec![0; INIT_BUFFER_SIZE]; - } else if self.write_pos > 0 && self.read_pos == cap { - let count = self.read_pos - self.write_pos; + // first reserve + let init = cmp::max(INIT_BUFFER_SIZE, needed); + trace!("reserving initial {}", init); + self.vec = vec![0; init]; + } else if self.head > 0 && self.tail == cap && self.head >= needed { + // there is space to shift over + let count = self.tail - self.head; trace!("moving buffer bytes over by {}", count); unsafe { ptr::copy( - self.vec.as_ptr().offset(self.write_pos as isize), + self.vec.as_ptr().offset(self.head as isize), self.vec.as_mut_ptr(), count ); } - self.read_pos -= count; - self.write_pos = 0; - } else if self.read_pos == cap && cap < MAX_BUFFER_SIZE { + self.tail -= count; + self.head = 0; + } else if self.tail == cap && cap < MAX_BUFFER_SIZE { self.vec.reserve(cmp::min(cap * 4, MAX_BUFFER_SIZE) - cap); let new = self.vec.capacity() - cap; trace!("reserved {}", new); @@ -80,36 +133,11 @@ impl Buffer { } } - pub fn wrap<'a, 'b: 'a, R: io::Read>(&'a mut self, reader: &'b mut R) -> BufReader<'a, R> { - BufReader { - buf: self, - reader: reader - } - } -} - -#[derive(Debug)] -pub struct BufReader<'a, R: io::Read + 'a> { - buf: &'a mut Buffer, - reader: &'a mut R -} - -impl<'a, R: io::Read + 'a> BufReader<'a, R> { - pub fn get_ref(&self) -> &R { - self.reader - } -} - -impl<'a, R: io::Read> Read for BufReader<'a, R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - trace!("BufReader.read self={}, buf={}", self.buf.len(), buf.len()); - let n = try!(self.buf.bytes().read(buf)); - self.buf.consume(n); - if n == 0 { - self.buf.reset(); - self.reader.read(&mut buf[n..]) - } else { - Ok(n) + #[inline] + fn maybe_reset(&mut self) { + if self.tail != 0 && self.tail == self.head { + self.tail = 0; + self.head = 0; } } } diff --git a/src/http/channel.rs b/src/http/channel.rs deleted file mode 100644 index ec80148fc8..0000000000 --- a/src/http/channel.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::fmt; -use std::sync::{Arc, mpsc}; -use std::sync::atomic::{AtomicBool, Ordering}; -use ::rotor; - -pub use std::sync::mpsc::TryRecvError; - -pub fn new(notify: rotor::Notifier) -> (Sender, Receiver) { - let b = Arc::new(AtomicBool::new(false)); - let (tx, rx) = mpsc::channel(); - (Sender { - awake: b.clone(), - notify: notify, - tx: tx, - }, - Receiver { - awake: b, - rx: rx, - }) -} - -pub fn share(other: &Sender) -> (Sender, Receiver) { - let (tx, rx) = mpsc::channel(); - let notify = other.notify.clone(); - let b = other.awake.clone(); - (Sender { - awake: b.clone(), - notify: notify, - tx: tx, - }, - Receiver { - awake: b, - rx: rx, - }) -} - -pub struct Sender { - awake: Arc, - notify: rotor::Notifier, - tx: mpsc::Sender, -} - -impl Sender { - pub fn send(&self, val: T) -> Result<(), SendError> { - try!(self.tx.send(val)); - if !self.awake.swap(true, Ordering::SeqCst) { - try!(self.notify.wakeup()); - } - Ok(()) - } -} - -impl Clone for Sender { - fn clone(&self) -> Sender { - Sender { - awake: self.awake.clone(), - notify: self.notify.clone(), - tx: self.tx.clone(), - } - } -} - -impl fmt::Debug for Sender { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Sender") - .field("notify", &self.notify) - .finish() - } -} - -#[derive(Debug)] -pub struct SendError(pub Option); - -impl From> for SendError { - fn from(e: mpsc::SendError) -> SendError { - SendError(Some(e.0)) - } -} - -impl From for SendError { - fn from(_e: rotor::WakeupError) -> SendError { - SendError(None) - } -} - -pub struct Receiver { - awake: Arc, - rx: mpsc::Receiver, -} - -impl Receiver { - pub fn try_recv(&self) -> Result { - self.awake.store(false, Ordering::Relaxed); - self.rx.try_recv() - } -} diff --git a/src/http/chunk.rs b/src/http/chunk.rs new file mode 100644 index 0000000000..45179adf76 --- /dev/null +++ b/src/http/chunk.rs @@ -0,0 +1,77 @@ +use std::borrow::Borrow; +use std::fmt; +use std::sync::Arc; + +/// A piece of a message body. +pub struct Chunk(Inner); + +enum Inner { + Owned(Vec), + Referenced(Arc>), + Static(&'static [u8]), +} + +impl From> for Chunk { + #[inline] + fn from(v: Vec) -> Chunk { + Chunk(Inner::Owned(v)) + } +} + +impl From>> for Chunk { + #[inline] + fn from(v: Arc>) -> Chunk { + Chunk(Inner::Referenced(v)) + } +} + +impl From<&'static [u8]> for Chunk { + #[inline] + fn from(slice: &'static [u8]) -> Chunk { + Chunk(Inner::Static(slice)) + } +} + +impl From for Chunk { + #[inline] + fn from(s: String) -> Chunk { + s.into_bytes().into() + } +} + +impl From<&'static str> for Chunk { + #[inline] + fn from(slice: &'static str) -> Chunk { + slice.as_bytes().into() + } +} + +impl ::std::ops::Deref for Chunk { + type Target = [u8]; + + #[inline] + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl AsRef<[u8]> for Chunk { + #[inline] + fn as_ref(&self) -> &[u8] { + match self.0 { + Inner::Owned(ref vec) => vec, + Inner::Referenced(ref vec) => { + let v: &Vec = vec.borrow(); + v.as_slice() + } + Inner::Static(slice) => slice, + } + } +} + +impl fmt::Debug for Chunk { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self.as_ref(), f) + } +} diff --git a/src/http/conn.rs b/src/http/conn.rs index 54cef874c8..9258efbe40 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -1,1144 +1,653 @@ -use std::borrow::Cow; use std::fmt; -use std::hash::Hash; -use std::io; +use std::io::{self, Write}; use std::marker::PhantomData; -use std::mem; -use std::time::Duration; +use std::time::Instant; -use rotor::{self, EventSet, PollOpt, Scope, Time}; +use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend}; +use tokio::io::Io; +use tokio_proto::streaming::pipeline::{Frame, Transport}; -use http::{self, h1, Http1Message, Encoder, Decoder, Next, Next_, Reg, Control}; -use http::channel; -use http::internal::WriteBuf; -use http::buffer::Buffer; -use net::{Transport, Blocked}; +use header::{ContentLength, TransferEncoding}; +use http::{self, Http1Transaction}; +use http::io::{Cursor, Buffered}; +use http::h1::{Encoder, Decoder}; use version::HttpVersion; -const MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; -/// This handles a connection, which will have been established over a -/// Transport (like a socket), and will likely include multiple -/// `Message`s over HTTP. +/// This handles a connection, which will have been established over an +/// `Io` (like a socket), and will likely include multiple +/// `Transaction`s over HTTP. /// -/// The connection will determine when a message begins and ends, creating -/// a new message `MessageHandler` for each one, as well as determine if this -/// connection can be kept alive after the message, or if it is complete. -pub struct Conn>(Box>); - - -/// `ConnInner` contains all of a connections state which Conn proxies for in a way -/// that allows Conn to maintain convenient move and self consuming method call -/// semantics but avoiding many costly memcpy calls. -struct ConnInner> { - buf: Buffer, - ctrl: (channel::Sender, channel::Receiver), - keep_alive_enabled: bool, - key: K, - state: State, - transport: T, - /// Records a WouldBlock error when trying to read - /// - /// This flag is used to prevent busy looping - read_would_block: bool, +/// The connection will determine when a message begins and ends as well as +/// determine if this connection can be kept alive after the message, +/// or if it is complete. +pub struct Conn { + io: Buffered, + state: State, + _marker: PhantomData } -impl> fmt::Debug for ConnInner { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Conn") - .field("keep_alive_enabled", &self.keep_alive_enabled) - .field("state", &self.state) - .field("buf", &self.buf) - .finish() +impl Conn { + pub fn new(io: I, keep_alive: K) -> Conn { + Conn { + io: Buffered::new(io), + state: State { + reading: Reading::Init, + writing: Writing::Init, + keep_alive: keep_alive, + }, + _marker: PhantomData, + } } -} -impl> fmt::Debug for Conn { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) + fn parse(&mut self) -> ::Result>> { + self.io.parse::() } -} + fn is_read_ready(&mut self) -> bool { + match self.state.reading { + Reading::Init | + Reading::Body(..) => self.io.poll_read().is_ready(), + Reading::KeepAlive | + Reading::Closed => true, + } + } -impl> ConnInner { - /// Desired Register interest based on state of current connection. - /// - /// This includes the user interest, such as when they return `Next::read()`. - fn interest(&self) -> Reg { - match self.state { - State::Closed => Reg::Remove, - State::Init { interest, .. } => { - interest.register() - } - State::Http1(Http1 { reading: Reading::Closed, writing: Writing::Closed, .. }) => { - Reg::Remove - } - State::Http1(Http1 { ref reading, ref writing, .. }) => { - let read = match *reading { - Reading::Parse | - Reading::Body(..) => Reg::Read, - Reading::Init | - Reading::Wait(..) | - Reading::KeepAlive | - Reading::Closed => Reg::Wait - }; - - let write = match *writing { - Writing::Head | - Writing::Chunk(..) | - Writing::Ready(..) => Reg::Write, - Writing::Init | - Writing::Wait(..) | - Writing::KeepAlive => Reg::Wait, - Writing::Closed => Reg::Wait, - }; + fn is_read_closed(&self) -> bool { + self.state.is_read_closed() + } - match (read, write) { - (Reg::Read, Reg::Write) => Reg::ReadWrite, - (Reg::Read, Reg::Wait) => Reg::Read, - (Reg::Wait, Reg::Write) => Reg::Write, - (Reg::Wait, Reg::Wait) => Reg::Wait, - _ => unreachable!("bad read/write reg combo") - } - } - } + #[allow(unused)] + fn is_write_closed(&self) -> bool { + self.state.is_write_closed() } - /// Actual register action. - /// - /// Considers the user interest(), but also compares if the underlying - /// transport is blocked(), and adjusts accordingly. - fn register(&self) -> Reg { - let reg = self.interest(); - match (reg, self.transport.blocked()) { - (Reg::Remove, _) | - (Reg::Wait, _) | - (_, None) => reg, - - (_, Some(Blocked::Read)) => Reg::Read, - (_, Some(Blocked::Write)) => Reg::Write, + fn can_read_head(&self) -> bool { + match self.state.reading { + Reading::Init => true, + _ => false, } } - fn parse(&mut self) -> ::Result>::Message as Http1Message>::Incoming>> { - match self.buf.read_from(&mut self.transport) { - Ok(0) => { - trace!("parse eof"); - return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "parse eof").into()); - } - Ok(_) => {}, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => {}, - _ => return Err(e.into()) - } - } - match try!(http::parse::<>::Message, _>(self.buf.bytes())) { - Some((head, len)) => { - trace!("parsed {} bytes out of {}", len, self.buf.len()); - self.buf.consume(len); - Ok(head) - }, - None => { - if self.buf.len() >= MAX_BUFFER_SIZE { - //TODO: Handler.on_too_large_error() - debug!("MAX_BUFFER_SIZE reached, closing"); - Err(::Error::TooLarge) - } else { - Err(io::Error::new(io::ErrorKind::WouldBlock, "incomplete parse").into()) - } - }, + fn can_read_body(&self) -> bool { + match self.state.reading { + Reading::Body(..) => true, + _ => false, } } - fn read>(&mut self, scope: &mut Scope, state: State) -> State { - match state { - State::Init { interest: Next_::Read, .. } => { - let head = match self.parse() { - Ok(head) => head, - Err(::Error::Io(e)) => match e.kind() { - io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => { - self.read_would_block = true; - return state; - }, - _ => { - debug!("io error trying to parse {:?}", e); - return State::Closed; - } - }, - Err(e) => { - //TODO: send proper error codes depending on error - trace!("parse eror: {:?}", e); - return State::Closed; - } - }; - let mut handler = match scope.create(Seed(&self.key, &self.ctrl.0)) { - Some(handler) => handler, - None => unreachable!() - }; - match H::Message::decoder(&head) { - Ok(decoder) => { - trace!("decoder = {:?}", decoder); - let keep_alive = self.keep_alive_enabled && head.should_keep_alive(); - let next = handler.on_incoming(head, &self.transport); - trace!("handler.on_incoming() -> {:?}", next); - - let now = scope.now(); - match next.interest { - Next_::Read => self.read(scope, State::Http1(Http1 { - handler: handler, - reading: Reading::Body(decoder), - writing: Writing::Init, - keep_alive: keep_alive, - timeout: next.timeout, - timeout_start: Some(now), - _marker: PhantomData, - })), - Next_::Write => State::Http1(Http1 { - handler: handler, - reading: if decoder.is_eof() { - if keep_alive { - Reading::KeepAlive - } else { - Reading::Closed - } - } else { - Reading::Wait(decoder) - }, - writing: Writing::Head, - keep_alive: keep_alive, - timeout: next.timeout, - timeout_start: Some(now), - _marker: PhantomData, - }), - Next_::ReadWrite => self.read(scope, State::Http1(Http1 { - handler: handler, - reading: Reading::Body(decoder), - writing: Writing::Head, - keep_alive: keep_alive, - timeout: next.timeout, - timeout_start: Some(now), - _marker: PhantomData, - })), - Next_::Wait => State::Http1(Http1 { - handler: handler, - reading: Reading::Wait(decoder), - writing: Writing::Init, - keep_alive: keep_alive, - timeout: next.timeout, - timeout_start: Some(now), - _marker: PhantomData, - }), - Next_::End | - Next_::Remove => State::Closed, - } - }, - Err(e) => { - debug!("error creating decoder: {:?}", e); - //TODO: update state from returned Next - //this would allow a Server to respond with a proper - //4xx code - let _ = handler.on_error(e); - State::Closed - } - } - }, - State::Init { interest: Next_::Wait, .. } => { - match self.buf.read_from(&mut self.transport) { - Ok(0) => { - // End-of-file; connection was closed by peer - State::Closed - }, - Ok(n) => { - // Didn't expect bytes here! Close the connection. - warn!("read {} bytes in State::Init with Wait interest", n); - State::Closed - }, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => { - // This is the expected case reading in this state - self.read_would_block = true; - state - }, - _ => { - warn!("socket error reading State::Init with Wait interest: {}", e); - State::Closed - } - } - } - }, - State::Init { .. } => { - trace!("on_readable State::{:?}", state); - state - }, - State::Http1(mut http1) => { - let next = match http1.reading { - Reading::Init => None, - Reading::Parse => match self.parse() { - Ok(head) => match H::Message::decoder(&head) { - Ok(decoder) => { - trace!("decoder = {:?}", decoder); - // if client request asked for keep alive, - // then it depends entirely on if the server agreed - if http1.keep_alive { - http1.keep_alive = head.should_keep_alive(); - } - let next = http1.handler.on_incoming(head, &self.transport); - http1.reading = Reading::Wait(decoder); - trace!("handler.on_incoming() -> {:?}", next); - Some(next) - }, - Err(e) => { - debug!("error creating decoder: {:?}", e); - //TODO: respond with 400 - return State::Closed; - } - }, - Err(::Error::Io(e)) => match e.kind() { - io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => { - self.read_would_block = true; - None - }, - _ => { - debug!("io error trying to parse {:?}", e); - return State::Closed; - } - }, - Err(e) => { - trace!("parse error: {:?}", e); - let _ = http1.handler.on_error(e); - return State::Closed; - } - }, - Reading::Body(ref mut decoder) => { - let wrapped = if !self.buf.is_empty() { - super::Trans::Buf(self.buf.wrap(&mut self.transport)) - } else { - super::Trans::Port(&mut self.transport) - }; - - Some(http1.handler.on_decode(&mut Decoder::h1(decoder, wrapped))) - }, - _ => { - trace!("Conn.on_readable State::Http1(reading = {:?})", http1.reading); - None - } - }; - let mut s = State::Http1(http1); - if let Some(next) = next { - s.update(next, &**scope, Some(scope.now())); - } - trace!("Conn.on_readable State::Http1 completed, new state = State::{:?}", s); - - let again = match s { - State::Http1(Http1 { reading: Reading::Body(ref encoder), .. }) => encoder.is_eof(), - _ => false - }; + fn read_head(&mut self) -> Poll, http::Chunk, ::Error>>, io::Error> { + debug_assert!(self.can_read_head()); + trace!("Conn::read_head"); - if again { - self.read(scope, s) + let (version, head) = match self.parse() { + Ok(Some(head)) => (head.version, head), + Ok(None) => return Ok(Async::NotReady), + Err(e) => { + self.state.close(); + self.io.consume_leading_lines(); + if !self.io.read_buf().is_empty() { + error!("parse error ({}) with bytes: {:?}", e, self.io.read_buf()); + return Ok(Async::Ready(Some(Frame::Error { error: e }))); } else { - s + trace!("parse error with 0 input, err = {:?}", e); + return Ok(Async::Ready(None)); } - }, - State::Closed => { - trace!("on_readable State::Closed"); - State::Closed } - } - } + }; - fn write>(&mut self, scope: &mut Scope, mut state: State) -> State { - let next = match state { - State::Init { interest: Next_::Write, .. } => { - // this is a Client request, which writes first, so pay - // attention to the version written here, which will adjust - // our internal state to Http1 or Http2 - let mut handler = match scope.create(Seed(&self.key, &self.ctrl.0)) { - Some(handler) => handler, - None => { - trace!("could not create handler {:?}", self.key); - return State::Closed; + match version { + HttpVersion::Http10 | HttpVersion::Http11 => { + let decoder = match T::decoder(&head) { + Ok(d) => d, + Err(e) => { + error!("decoder error = {:?}", e); + self.state.close(); + return Ok(Async::Ready(Some(Frame::Error { error: e }))); } }; - let mut head = http::MessageHead::default(); - let mut interest = handler.on_outgoing(&mut head); - if head.version == HttpVersion::Http11 { - let mut buf = Vec::new(); - let keep_alive = self.keep_alive_enabled && head.should_keep_alive(); - let mut encoder = H::Message::encode(head, &mut buf); - let writing = match interest.interest { - // user wants to write some data right away - // try to write the headers and the first chunk - // together, so they are in the same packet - Next_::Write | - Next_::ReadWrite => { - encoder.prefix(WriteBuf { - bytes: buf, - pos: 0 - }); - interest = handler.on_encode(&mut Encoder::h1(&mut encoder, &mut self.transport)); - Writing::Ready(encoder) - }, - _ => Writing::Chunk(Chunk { - buf: Cow::Owned(buf), - pos: 0, - next: (encoder, interest.clone()) - }) - }; - state = State::Http1(Http1 { - reading: Reading::Init, - writing: writing, - handler: handler, - keep_alive: keep_alive, - timeout: interest.timeout, - timeout_start: Some(scope.now()), - _marker: PhantomData, - }) - } - Some(interest) - } - State::Init { .. } => { - trace!("Conn.on_writable State::{:?}", state); - None - } - State::Http1(Http1 { ref mut handler, ref mut writing, ref mut keep_alive, .. }) => { - match *writing { - Writing::Init => { - trace!("Conn.on_writable Http1::Writing::Init"); - None - } - Writing::Head => { - let mut head = http::MessageHead::default(); - let mut interest = handler.on_outgoing(&mut head); - // if the request wants to close, server cannot stop it - if *keep_alive { - // if the request wants to stay alive, then it depends - // on the server to agree - *keep_alive = head.should_keep_alive(); - } - let mut buf = Vec::new(); - let mut encoder = <>::Message as Http1Message>::encode(head, &mut buf); - *writing = match interest.interest { - // user wants to write some data right away - // try to write the headers and the first chunk - // together, so they are in the same packet - Next_::Write | - Next_::ReadWrite => { - encoder.prefix(WriteBuf { - bytes: buf, - pos: 0 - }); - interest = handler.on_encode(&mut Encoder::h1(&mut encoder, &mut self.transport)); - Writing::Ready(encoder) - }, - _ => Writing::Chunk(Chunk { - buf: Cow::Owned(buf), - pos: 0, - next: (encoder, interest.clone()) - }) - }; - Some(interest) - }, - Writing::Chunk(ref mut chunk) => { - trace!("Http1.Chunk on_writable"); - match self.transport.write(&chunk.buf.as_ref()[chunk.pos..]) { - Ok(n) => { - chunk.pos += n; - trace!("Http1.Chunk wrote={}, done={}", n, chunk.is_written()); - if chunk.is_written() { - Some(chunk.next.1.clone()) - } else { - None - } - }, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock | - io::ErrorKind::Interrupted => None, - _ => { - Some(handler.on_error(e.into())) - } - } - } - }, - Writing::Ready(ref mut encoder) => { - trace!("Http1.Ready on_writable"); - Some(handler.on_encode(&mut Encoder::h1(encoder, &mut self.transport))) - }, - Writing::Wait(..) => { - trace!("Conn.on_writable Http1::Writing::Wait"); - None - } - Writing::KeepAlive => { - trace!("Conn.on_writable Http1::Writing::KeepAlive"); - None - } - Writing::Closed => { - trace!("on_writable Http1::Writing::Closed"); - None - } - } + let wants_keep_alive = head.should_keep_alive(); + self.state.keep_alive &= wants_keep_alive; + let (body, reading) = if decoder.is_eof() { + (false, Reading::KeepAlive) + } else { + (true, Reading::Body(decoder)) + }; + self.state.reading = reading; + return Ok(Async::Ready(Some(Frame::Message { message: head, body: body }))); }, - State::Closed => { - trace!("on_writable State::Closed"); - None + _ => { + error!("unimplemented HTTP Version = {:?}", version); + self.state.close(); + return Ok(Async::Ready(Some(Frame::Error { error: ::Error::Version }))); } - }; - - if let Some(next) = next { - state.update(next, &**scope, Some(scope.now())); } - state } - fn can_read_more(&self, was_init: bool) -> bool { - let transport_blocked = self.transport.blocked().is_some(); - let read_would_block = self.read_would_block; + fn read_body(&mut self) -> Poll, io::Error> { + debug_assert!(self.can_read_body()); - let state_machine_ok = match self.state { - State::Init { .. } => !was_init && !self.buf.is_empty(), - _ => !self.buf.is_empty() - }; + trace!("Conn::read_body"); - !transport_blocked && !read_would_block && state_machine_ok - } + let (reading, ret) = match self.state.reading { + Reading::Body(ref mut decoder) => { + //TODO use an appendbuf or something + let mut buf = vec![0; 1024 * 4]; + let n = try_nb!(decoder.decode(&mut self.io, &mut buf)); + if n > 0 { + buf.truncate(n); + return Ok(Async::Ready(Some(http::Chunk::from(buf)))); + } else { + if decoder.is_eof() { + (Reading::KeepAlive, Ok(Async::Ready(None))) + } else { + (Reading::Closed, Ok(Async::Ready(None))) + } + } - fn on_error(&mut self, err: ::Error, factory: &F) where F: MessageHandlerFactory { - debug!("on_error err = {:?}", err); - trace!("on_error state = {:?}", self.state); - let next = match self.state { - State::Init { .. } => Next::remove(), - State::Http1(ref mut http1) => http1.handler.on_error(err), - State::Closed => Next::remove(), + }, + Reading::Init | Reading::KeepAlive | Reading::Closed => unreachable!() }; - self.state.update(next, factory, None); + self.state.reading = reading; + ret } - fn on_readable(&mut self, scope: &mut Scope) - where F: MessageHandlerFactory { - // Clear would_block flag so state is clear going into read - self.read_would_block = false; - trace!("on_readable -> {:?}", self.state); - let state = mem::replace(&mut self.state, State::Closed); - self.state = self.read(scope, state); - trace!("on_readable <- {:?}", self.state); - } - - fn on_writable(&mut self, scope: &mut Scope) - where F: MessageHandlerFactory { - trace!("on_writable -> {:?}", self.state); - let state = mem::replace(&mut self.state, State::Closed); - self.state = self.write(scope, state); - trace!("on_writable <- {:?}", self.state); - } - - fn on_remove(self) { - debug!("on_remove"); - match self.state { - State::Init { .. } | State::Closed => (), - State::Http1(http1) => http1.handler.on_remove(self.transport), + fn can_write_head(&self) -> bool { + match self.state.writing { + Writing::Init => true, + _ => false } } -} - -pub enum ReadyResult { - Continue(C), - Done(Option<(C, Option)>) -} - -impl> Conn { - pub fn new( - key: K, - transport: T, - next: Next, - notify: rotor::Notifier, - now: Time - ) -> Conn { - Conn(Box::new(ConnInner { - buf: Buffer::new(), - ctrl: channel::new(notify), - keep_alive_enabled: true, - key: key, - state: State::Init { - interest: next.interest, - timeout: next.timeout, - timeout_start: Some(now), - }, - transport: transport, - read_would_block: false, - })) - } - - pub fn keep_alive(mut self, val: bool) -> Conn { - self.0.keep_alive_enabled = val; - self + fn can_write_body(&self) -> bool { + match self.state.writing { + Writing::Body(..) => true, + _ => false + } } - pub fn ready( - mut self, - events: EventSet, - scope: &mut Scope - ) -> ReadyResult - where F: MessageHandlerFactory - { - trace!("Conn::ready events='{:?}', blocked={:?}", events, self.0.transport.blocked()); - - if events.is_error() { - match self.0.transport.take_socket_error() { - Ok(_) => { - trace!("is_error, but not socket error"); - // spurious? - }, - Err(e) => self.0.on_error(e.into(), &**scope) + fn write_head(&mut self, mut head: http::MessageHead, body: bool) -> StartSend,io::Error> { + debug_assert!(self.can_write_head()); + if !body { + head.headers.remove::(); + //TODO: check that this isn't a response to a HEAD + //request, which could include the content-length + //even if no body is to be written + if T::should_set_length(&head) { + head.headers.set(ContentLength(0)); } } - // if the user had an io interest, but the transport was blocked differently, - // the event needs to be translated to what the user was actually expecting. - // - // Example: - // - User asks for `Next::write(). - // - But transport is in the middle of renegotiating TLS, and is blocked on reading. - // - hyper should not wait on the `write` event, since epoll already - // knows it is writable. We would just loop a whole bunch, and slow down. - // - So instead, hyper waits on the event needed to unblock the transport, `read`. - // - Once epoll detects the transport is readable, it will alert hyper - // with a `readable` event. - // - hyper needs to translate that `readable` event back into a `write`, - // since that is actually what the Handler wants. - - let events = if let Some(blocked) = self.0.transport.blocked() { - let interest = self.0.interest(); - trace!("translating blocked={:?}, interest={:?}", blocked, interest); - match (blocked, interest) { - (Blocked::Read, Reg::Write) => EventSet::writable(), - (Blocked::Write, Reg::Read) => EventSet::readable(), - // otherwise, the transport was blocked on the same thing the user wanted - _ => events - } + let wants_keep_alive = head.should_keep_alive(); + self.state.keep_alive &= wants_keep_alive; + let mut buf = Vec::new(); + let encoder = T::encode(&mut head, &mut buf); + self.io.buffer(buf); + self.state.writing = if body { + Writing::Body(encoder, None) } else { - events + Writing::KeepAlive }; - let was_init = match self.0.state { - State::Init { .. } => true, - _ => false - }; + Ok(AsyncSink::Ready) + } - if events.is_readable() { - self.0.on_readable(scope); - } + fn write_body(&mut self, chunk: Option) -> StartSend, io::Error> { + debug_assert!(self.can_write_body()); - if events.is_hup() { - trace!("Conn::ready got hangup"); - let _ = scope.deregister(&self.0.transport); - self.on_remove(); - return ReadyResult::Done(None); - } + let state = match self.state.writing { + Writing::Body(ref mut encoder, ref mut queued) => { + if queued.is_some() { + return Ok(AsyncSink::NotReady(chunk)); + } + let mut is_done = true; + let mut wbuf = Cursor::new(match chunk { + Some(chunk) => { + is_done = false; + chunk + } + None => { + // Encode a zero length chunk + // the http1 encoder does the right thing + // encoding either the final chunk or ignoring the input + http::Chunk::from(Vec::new()) + } + }); - if events.is_writable() { - self.0.on_writable(scope); - } + match encoder.encode(&mut self.io, wbuf.buf()) { + Ok(n) => { + wbuf.consume(n); - let mut events = match self.0.register() { - Reg::Read => EventSet::readable(), - Reg::Write => EventSet::writable(), - Reg::ReadWrite => EventSet::readable() | EventSet::writable(), - Reg::Wait => EventSet::none(), - Reg::Remove => { - trace!("removing transport"); - let _ = scope.deregister(&self.0.transport); - self.on_remove(); - return ReadyResult::Done(None); + if !wbuf.is_written() { + *queued = Some(wbuf); + } + }, + Err(e) => match e.kind() { + io::ErrorKind::WouldBlock => { + *queued = Some(wbuf); + }, + _ => return Err(e) + } + } + + if encoder.is_eof() { + Writing::KeepAlive + } else if is_done { + Writing::Closed + } else { + return Ok(AsyncSink::Ready); + } }, + Writing::Init | Writing::KeepAlive | Writing::Closed => unreachable!(), }; - - if events.is_readable() && self.0.can_read_more(was_init) { - return ReadyResult::Continue(self); - } - - events = events | EventSet::hup(); - - trace!("scope.reregister({:?})", events); - match scope.reregister(&self.0.transport, events, PollOpt::level()) { - Ok(..) => { - let timeout = self.0.state.timeout(); - ReadyResult::Done(Some((self, timeout))) + self.state.writing = state; + Ok(AsyncSink::Ready) + } + + fn write_queued(&mut self) -> Poll<(), io::Error> { + trace!("Conn::write_queued()"); + match self.state.writing { + Writing::Body(ref mut encoder, ref mut queued) => { + let complete = if let Some(chunk) = queued.as_mut() { + let n = try_nb!(encoder.encode(&mut self.io, chunk.buf())); + chunk.consume(n); + chunk.is_written() + } else { + true + }; + trace!("Conn::write_queued complete = {}", complete); + if complete { + *queued = None; + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } }, - Err(e) => { - trace!("error reregistering: {:?}", e); - self.0.on_error(e.into(), &**scope); - ReadyResult::Done(None) - } + _ => Ok(Async::Ready(())), } } - pub fn wakeup(mut self, scope: &mut Scope) -> Option<(Self, Option)> - where F: MessageHandlerFactory { - while let Ok(next) = self.0.ctrl.1.try_recv() { - trace!("woke up with {:?}", next); - let timeout_start = self.0.state.timeout_start(); - self.0.state.update(next, &**scope, timeout_start); + fn flush(&mut self) -> Poll<(), io::Error> { + try_nb!(self.write_queued()); + try_nb!(self.io.flush()); + self.state.try_keep_alive(); + trace!("flushed {:?}", self.state); + if self.is_read_ready() { + ::futures::task::park().unpark(); } + Ok(Async::Ready(())) - let mut conn = Some(self); - loop { - match conn.take().unwrap().ready(EventSet::readable() | EventSet::writable(), scope) { - ReadyResult::Done(val) => return val, - ReadyResult::Continue(c) => conn = Some(c), - } - } } +} - pub fn timeout(mut self, scope: &mut Scope) -> Option<(Self, Option)> - where F: MessageHandlerFactory { - // Run error handler if timeout has elapsed - if self.0.state.timeout_elapsed(scope.now()) { - self.0.on_error(::Error::Timeout, &**scope); - } - - let mut conn = Some(self); - loop { - match conn.take().unwrap().ready(EventSet::none(), scope) { - ReadyResult::Done(val) => return val, - ReadyResult::Continue(c) => conn = Some(c), - } +impl Stream for Conn +where I: Io, + T: Http1Transaction, + K: KeepAlive, + T::Outgoing: fmt::Debug { + type Item = Frame, http::Chunk, ::Error>; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + trace!("Conn::poll()"); + + if self.is_read_closed() { + trace!("Conn::poll when closed"); + Ok(Async::Ready(None)) + } else if self.can_read_head() { + self.read_head() + } else if self.can_read_body() { + self.read_body() + .map(|async| async.map(|chunk| Some(Frame::Body { + chunk: chunk + }))) + } else { + trace!("poll when on keep-alive"); + Ok(Async::NotReady) } } +} + +impl Sink for Conn +where I: Io, + T: Http1Transaction, + K: KeepAlive, + T::Outgoing: fmt::Debug { + type SinkItem = Frame, http::Chunk, ::Error>; + type SinkError = io::Error; + + fn start_send(&mut self, frame: Self::SinkItem) -> StartSend { + trace!("Conn::start_send( frame={:?} )", DebugFrame(&frame)); + + let frame: Self::SinkItem = match frame { + Frame::Message { message: head, body } => { + if self.can_write_head() { + return self.write_head(head, body) + .map(|async| { + match async { + AsyncSink::Ready => AsyncSink::Ready, + AsyncSink::NotReady(head) => { + AsyncSink::NotReady(Frame::Message { + message: head, + body: body, + }) + } + } + }) + } else { + Frame::Message { message: head, body: body } + } + }, + Frame::Body { chunk } => { + if self.can_write_body() { + return self.write_body(chunk) + .map(|async| { + match async { + AsyncSink::Ready => AsyncSink::Ready, + AsyncSink::NotReady(chunk) => AsyncSink::NotReady(Frame::Body { + chunk: chunk, + }) + } + }); + } else if chunk.is_none() { + return Ok(AsyncSink::Ready); + } else { + Frame::Body { chunk: chunk } + } + }, + Frame::Error { error } => { + debug!("received error, closing: {:?}", error); + self.state.close(); + return Ok(AsyncSink::Ready); + }, + }; + + error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, frame); + Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame")) - fn on_remove(self) { - self.0.on_remove() } - pub fn key(&self) -> &K { - &self.0.key + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + trace!("Conn::poll_complete()"); + let ret = self.flush(); + trace!("Conn::flush = {:?}", ret); + ret } +} - pub fn control(&self) -> Control { - Control { - tx: self.0.ctrl.0.clone(), - } +impl Transport for Conn +where I: Io + 'static, + T: Http1Transaction + 'static, + K: KeepAlive + 'static, + T::Outgoing: fmt::Debug {} + +impl fmt::Debug for Conn { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Conn") + .field("state", &self.state) + .field("io", &self.io) + .finish() } +} - pub fn is_idle(&self) -> bool { - if let State::Init { interest: Next_::Wait, .. } = self.0.state { - true - } else { - false +#[derive(Debug)] +struct State { + reading: Reading, + writing: Writing, + keep_alive: K, +} + +#[derive(Debug)] +enum Reading { + Init, + Body(Decoder), + KeepAlive, + Closed, +} + +#[derive(Debug)] +enum Writing { + Init, + Body(Encoder, Option>), + KeepAlive, + Closed, +} + +impl ::std::ops::BitAndAssign for KA { + fn bitand_assign(&mut self, enabled: bool) { + if !enabled { + *self = KA::Disabled; } } } -enum State, T: Transport> { - Init { - interest: Next_, - timeout: Option, - timeout_start: Option