From d127201ef22b10ab1d84b3f2215863eb2d03bfcb Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 23 Apr 2018 16:56:26 -0700 Subject: [PATCH] feat(rt): make tokio runtime optional A Cargo feature `runtime` is added, which is enabled by default, that includes the following: - The `client::HttpConnector`, which uses `tokio::net::TcpStream`. - The `server::AddrStream`, which uses `tokio::net::TcpListener`. - The `hyper::rt` module, which includes useful utilities to work with the runtime without needing to import `futures` or `tokio` explicity. Disabling the feature removes many of these niceties, but allows people to use hyper in environments that have an alternative runtime, without needing to download an unused one. --- .travis.yml | 5 +- Cargo.toml | 84 +++++- examples/client.rs | 11 +- examples/hello.rs | 7 +- examples/multi_server.rs | 12 +- examples/params.rs | 3 +- examples/send_file.rs | 3 +- examples/server.rs | 7 +- examples/web_api.rs | 3 +- src/client/connect.rs | 636 ++++++++++++++++++++------------------- src/client/mod.rs | 15 +- src/client/pool.rs | 4 +- src/client/tests.rs | 1 + src/common/exec.rs | 13 +- src/error.rs | 3 + src/lib.rs | 11 +- src/mock.rs | 18 +- src/rt.rs | 11 + src/server/conn.rs | 23 +- src/server/mod.rs | 28 +- src/server/tcp.rs | 7 +- 21 files changed, 518 insertions(+), 387 deletions(-) create mode 100644 src/rt.rs diff --git a/.travis.yml b/.travis.yml index 50ad988fa1..d68e9cf5ce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,13 +9,12 @@ matrix: - rust: beta - rust: stable env: HYPER_DOCS=1 + - rust: stable + env: FEATURES="--no-default-features" - rust: 1.21.0 cache: apt: true - directories: - - target/debug/deps - - target/debug/build script: - ./.travis/readme.py diff --git a/Cargo.toml b/Cargo.toml index 90380e9fae..f3e47952d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,19 +22,21 @@ include = [ [dependencies] bytes = "0.4.4" -futures = "0.1.17" -futures-cpupool = "0.1.6" +futures = "0.1.21" +futures-cpupool = { version = "0.1.6", optional = true } futures-timer = "0.1.0" http = "0.1.5" httparse = "1.0" h2 = "0.1.5" iovec = "0.1" log = "0.4" -net2 = "0.2.32" +net2 = { version = "0.2.32", optional = true } time = "0.1" -tokio = "0.1.5" -tokio-executor = "0.1.0" +tokio = { version = "0.1.5", optional = true } +tokio-executor = { version = "0.1.0", optional = true } tokio-io = "0.1" +tokio-reactor = { version = "0.1", optional = true } +tokio-tcp = { version = "0.1", optional = true } want = "0.0.3" [dev-dependencies] @@ -44,4 +46,76 @@ spmc = "0.2" url = "1.0" [features] +default = ["runtime"] nightly = [] +runtime = [ + "futures-cpupool", + "net2", + "tokio", + "tokio-executor", + "tokio-reactor", + "tokio-tcp", +] + +[[example]] +name = "client" +path = "examples/client.rs" +required-features = ["runtime"] + +[[example]] +name = "hello" +path = "examples/hello.rs" +required-features = ["runtime"] + +[[example]] +name = "multi_server" +path = "examples/multi_server.rs" +required-features = ["runtime"] + +[[example]] +name = "params" +path = "examples/params.rs" +required-features = ["runtime"] + +[[example]] +name = "send_file" +path = "examples/send_file.rs" +required-features = ["runtime"] + +[[example]] +name = "server" +path = "examples/server.rs" +required-features = ["runtime"] + +[[example]] +name = "web_api" +path = "examples/web_api.rs" +required-features = ["runtime"] + + +[[bench]] +name = "end_to_end" +path = "benches/end_to_end.rs" +required-features = ["runtime"] + +[[bench]] +name = "server" +path = "benches/server.rs" +required-features = ["runtime"] + + +[[test]] +name = "client" +path = "tests/client.rs" +required-features = ["runtime"] + +[[test]] +name = "integration" +path = "tests/integration.rs" +required-features = ["runtime"] + +[[test]] +name = "server" +path = "tests/server.rs" +required-features = ["runtime"] + diff --git a/examples/client.rs b/examples/client.rs index cfce85b03b..ffc638471a 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,17 +1,12 @@ -//#![deny(warnings)] -extern crate futures; +#![deny(warnings)] extern crate hyper; -extern crate tokio; - extern crate pretty_env_logger; use std::env; use std::io::{self, Write}; -use futures::{Future, Stream}; -use futures::future::lazy; - use hyper::{Body, Client, Request}; +use hyper::rt::{self, Future, Stream}; fn main() { pretty_env_logger::init(); @@ -30,7 +25,7 @@ fn main() { return; } - tokio::run(lazy(move || { + rt::run(rt::lazy(move || { let client = Client::new(); let mut req = Request::new(Body::empty()); diff --git a/examples/hello.rs b/examples/hello.rs index 302f7bdc2a..7740e8ee65 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -1,13 +1,10 @@ #![deny(warnings)] extern crate hyper; -extern crate futures; extern crate pretty_env_logger; -extern crate tokio; - -use futures::Future; use hyper::{Body, Response, Server}; use hyper::service::service_fn_ok; +use hyper::rt::{self, Future}; static PHRASE: &'static [u8] = b"Hello World!"; @@ -33,5 +30,5 @@ fn main() { println!("Listening on http://{}", addr); - tokio::run(server); + rt::run(server); } diff --git a/examples/multi_server.rs b/examples/multi_server.rs index 3988561cac..b12d6c6990 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -1,14 +1,10 @@ #![deny(warnings)] extern crate hyper; -extern crate futures; extern crate pretty_env_logger; -extern crate tokio; - -use futures::{Future}; -use futures::future::{lazy}; use hyper::{Body, Response, Server}; use hyper::service::service_fn_ok; +use hyper::rt::{self, Future}; static INDEX1: &'static [u8] = b"The 1st service!"; static INDEX2: &'static [u8] = b"The 2nd service!"; @@ -19,7 +15,7 @@ fn main() { let addr1 = ([127, 0, 0, 1], 1337).into(); let addr2 = ([127, 0, 0, 1], 1338).into(); - tokio::run(lazy(move || { + rt::run(rt::lazy(move || { let srv1 = Server::bind(&addr1) .serve(|| service_fn_ok(|_| Response::new(Body::from(INDEX1)))) .map_err(|e| eprintln!("server 1 error: {}", e)); @@ -30,8 +26,8 @@ fn main() { println!("Listening on http://{} and http://{}", addr1, addr2); - tokio::spawn(srv1); - tokio::spawn(srv2); + rt::spawn(srv1); + rt::spawn(srv2); Ok(()) })); diff --git a/examples/params.rs b/examples/params.rs index e3704f70ed..5c057bb851 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -2,7 +2,6 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -extern crate tokio; extern crate url; use futures::{future, Future, Stream}; @@ -93,5 +92,5 @@ fn main() { .serve(|| service_fn(param_example)) .map_err(|e| eprintln!("server error: {}", e)); - tokio::run(server); + hyper::rt::run(server); } diff --git a/examples/send_file.rs b/examples/send_file.rs index a72bebcd26..76758a8eaf 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -2,7 +2,6 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -extern crate tokio; use futures::{future, Future}; use futures::sync::oneshot; @@ -29,7 +28,7 @@ fn main() { println!("Listening on http://{}", addr); - tokio::run(server); + hyper::rt::run(server); } type ResponseFuture = Box, Error=io::Error> + Send>; diff --git a/examples/server.rs b/examples/server.rs index b30b115473..e0d300b1c3 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,13 +1,10 @@ #![deny(warnings)] -extern crate futures; extern crate hyper; extern crate pretty_env_logger; -extern crate tokio; - -use futures::Future; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use hyper::service::service_fn_ok; +use hyper::rt::Future; static INDEX: &'static [u8] = b"Try POST /echo"; @@ -40,5 +37,5 @@ fn main() { println!("Listening on http://{}", addr); - tokio::run(server); + hyper::rt::run(server); } diff --git a/examples/web_api.rs b/examples/web_api.rs index d5217b806a..025266e491 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -2,7 +2,6 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -extern crate tokio; use futures::{future, Future, Stream}; @@ -68,7 +67,7 @@ fn main() { let addr = "127.0.0.1:1337".parse().unwrap(); - tokio::run(future::lazy(move || { + hyper::rt::run(future::lazy(move || { // Share a `Client` with all `Service`s let client = Client::new(); diff --git a/src/client/connect.rs b/src/client/connect.rs index 8c659812c2..83dd1b676f 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -6,26 +6,12 @@ //! establishes connections over TCP. //! - The [`Connect`](Connect) trait and related types to build custom connectors. use std::error::Error as StdError; -use std::fmt; -use std::io; -use std::mem; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::Duration; - -use futures::{Future, Poll, Async}; -use futures::future::{Executor, ExecuteError}; -use futures::sync::oneshot; -use futures_cpupool::{Builder as CpuPoolBuilder}; + +use futures::Future; use http::Uri; -use http::uri::Scheme; -use net2::TcpBuilder; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio::reactor::Handle; -use tokio::net::{TcpStream, ConnectFuture}; -use super::dns; -use self::http_connector::HttpConnectorBlockingTask; +#[cfg(feature = "runtime")] pub use self::http::HttpConnector; /// Connect to a destination, returning an IO transport. /// @@ -135,367 +121,393 @@ impl Connected { */ } -fn connect(addr: &SocketAddr, handle: &Option) -> io::Result { - if let Some(ref handle) = *handle { - let builder = match addr { - &SocketAddr::V4(_) => TcpBuilder::new_v4()?, - &SocketAddr::V6(_) => TcpBuilder::new_v6()?, - }; - - if cfg!(windows) { - // Windows requires a socket be bound before calling connect - let any: SocketAddr = match addr { - &SocketAddr::V4(_) => { - ([0, 0, 0, 0], 0).into() - }, - &SocketAddr::V6(_) => { - ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() - } +#[cfg(feature = "runtime")] +mod http { + use super::*; + + use std::fmt; + use std::io; + use std::mem; + use std::net::SocketAddr; + use std::sync::Arc; + use std::time::Duration; + + use futures::{Async, Poll}; + use futures::future::{Executor, ExecuteError}; + use futures::sync::oneshot; + use futures_cpupool::{Builder as CpuPoolBuilder}; + use http::uri::Scheme; + use net2::TcpBuilder; + use tokio_reactor::Handle; + use tokio_tcp::{TcpStream, ConnectFuture}; + + use super::super::dns; + + use self::http_connector::HttpConnectorBlockingTask; + + + fn connect(addr: &SocketAddr, handle: &Option) -> io::Result { + if let Some(ref handle) = *handle { + let builder = match addr { + &SocketAddr::V4(_) => TcpBuilder::new_v4()?, + &SocketAddr::V6(_) => TcpBuilder::new_v6()?, }; - builder.bind(any)?; - } - Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) - } else { - Ok(TcpStream::connect(addr)) - } -} + if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match addr { + &SocketAddr::V4(_) => { + ([0, 0, 0, 0], 0).into() + }, + &SocketAddr::V6(_) => { + ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() + } + }; + builder.bind(any)?; + } -/// A connector for the `http` scheme. -/// -/// Performs DNS resolution in a thread pool, and then connects over TCP. -#[derive(Clone)] -pub struct HttpConnector { - executor: HttpConnectExecutor, - enforce_http: bool, - handle: Option, - keep_alive_timeout: Option, - nodelay: bool, -} + Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) + } else { + Ok(TcpStream::connect(addr)) + } + } -impl HttpConnector { - /// Construct a new HttpConnector. + /// A connector for the `http` scheme. /// - /// Takes number of DNS worker threads. - #[inline] - pub fn new(threads: usize) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, None) + /// Performs DNS resolution in a thread pool, and then connects over TCP. + #[derive(Clone)] + pub struct HttpConnector { + executor: HttpConnectExecutor, + enforce_http: bool, + handle: Option, + keep_alive_timeout: Option, + nodelay: bool, } - /// Construct a new HttpConnector with a specific Tokio handle. - pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, Some(handle)) - } + impl HttpConnector { + /// Construct a new HttpConnector. + /// + /// Takes number of DNS worker threads. + #[inline] + pub fn new(threads: usize) -> HttpConnector { + HttpConnector::new_with_handle_opt(threads, None) + } - fn new_with_handle_opt(threads: usize, handle: Option) -> HttpConnector { - let pool = CpuPoolBuilder::new() - .name_prefix("hyper-dns") - .pool_size(threads) - .create(); - HttpConnector::new_with_executor(pool, handle) - } + /// Construct a new HttpConnector with a specific Tokio handle. + pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { + HttpConnector::new_with_handle_opt(threads, Some(handle)) + } - /// Construct a new HttpConnector. - /// - /// Takes an executor to run blocking tasks on. - pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector - where E: Executor + Send + Sync - { - HttpConnector { - executor: HttpConnectExecutor(Arc::new(executor)), - enforce_http: true, - handle, - keep_alive_timeout: None, - nodelay: false, + fn new_with_handle_opt(threads: usize, handle: Option) -> HttpConnector { + let pool = CpuPoolBuilder::new() + .name_prefix("hyper-dns") + .pool_size(threads) + .create(); + HttpConnector::new_with_executor(pool, handle) } - } - /// Option to enforce all `Uri`s have the `http` scheme. - /// - /// Enabled by default. - #[inline] - pub fn enforce_http(&mut self, is_enforced: bool) { - self.enforce_http = is_enforced; - } + /// Construct a new HttpConnector. + /// + /// Takes an executor to run blocking tasks on. + pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector + where E: Executor + Send + Sync + { + HttpConnector { + executor: HttpConnectExecutor(Arc::new(executor)), + enforce_http: true, + handle, + keep_alive_timeout: None, + nodelay: false, + } + } - /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. - /// - /// If `None`, the option will not be set. - /// - /// Default is `None`. - #[inline] - pub fn set_keepalive(&mut self, dur: Option) { - self.keep_alive_timeout = dur; - } + /// Option to enforce all `Uri`s have the `http` scheme. + /// + /// Enabled by default. + #[inline] + pub fn enforce_http(&mut self, is_enforced: bool) { + self.enforce_http = is_enforced; + } - /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. - /// - /// Default is `false`. - #[inline] - pub fn set_nodelay(&mut self, nodelay: bool) { - self.nodelay = nodelay; + /// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration. + /// + /// If `None`, the option will not be set. + /// + /// Default is `None`. + #[inline] + pub fn set_keepalive(&mut self, dur: Option) { + self.keep_alive_timeout = dur; + } + + /// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`. + /// + /// Default is `false`. + #[inline] + pub fn set_nodelay(&mut self, nodelay: bool) { + self.nodelay = nodelay; + } } -} -impl fmt::Debug for HttpConnector { - #[inline] - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("HttpConnector") - .finish() + impl fmt::Debug for HttpConnector { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("HttpConnector") + .finish() + } } -} -impl Connect for HttpConnector { - type Transport = TcpStream; - type Error = io::Error; - type Future = HttpConnecting; - - fn connect(&self, dst: Destination) -> Self::Future { - trace!( - "Http::connect; scheme={}, host={}, port={:?}", - dst.scheme(), - dst.host(), - dst.port(), - ); - - if self.enforce_http { - if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url(InvalidUrl::NotHttp, &self.handle); + impl Connect for HttpConnector { + type Transport = TcpStream; + type Error = io::Error; + type Future = HttpConnecting; + + fn connect(&self, dst: Destination) -> Self::Future { + trace!( + "Http::connect; scheme={}, host={}, port={:?}", + dst.scheme(), + dst.host(), + dst.port(), + ); + + if self.enforce_http { + if dst.uri.scheme_part() != Some(&Scheme::HTTP) { + return invalid_url(InvalidUrl::NotHttp, &self.handle); + } + } else if dst.uri.scheme_part().is_none() { + return invalid_url(InvalidUrl::MissingScheme, &self.handle); } - } else if dst.uri.scheme_part().is_none() { - return invalid_url(InvalidUrl::MissingScheme, &self.handle); - } - let host = match dst.uri.host() { - Some(s) => s, - None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle), - }; - let port = match dst.uri.port() { - Some(port) => port, - None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 }, - }; + let host = match dst.uri.host() { + Some(s) => s, + None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle), + }; + let port = match dst.uri.port() { + Some(port) => port, + None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 }, + }; - HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), port), - handle: self.handle.clone(), - keep_alive_timeout: self.keep_alive_timeout, - nodelay: self.nodelay, + HttpConnecting { + state: State::Lazy(self.executor.clone(), host.into(), port), + handle: self.handle.clone(), + keep_alive_timeout: self.keep_alive_timeout, + nodelay: self.nodelay, + } } } -} -#[inline] -fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { - HttpConnecting { - state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), - handle: handle.clone(), - keep_alive_timeout: None, - nodelay: false, + #[inline] + fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { + HttpConnecting { + state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), + handle: handle.clone(), + keep_alive_timeout: None, + nodelay: false, + } } -} - -#[derive(Debug, Clone, Copy)] -enum InvalidUrl { - MissingScheme, - NotHttp, - MissingAuthority, -} -impl fmt::Display for InvalidUrl { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str(self.description()) + #[derive(Debug, Clone, Copy)] + enum InvalidUrl { + MissingScheme, + NotHttp, + MissingAuthority, } -} -impl StdError for InvalidUrl { - fn description(&self) -> &str { - match *self { - InvalidUrl::MissingScheme => "invalid URL, missing scheme", - InvalidUrl::NotHttp => "invalid URL, scheme must be http", - InvalidUrl::MissingAuthority => "invalid URL, missing domain", + impl fmt::Display for InvalidUrl { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) } } -} -/// A Future representing work to connect to a URL. -#[must_use = "futures do nothing unless polled"] -pub struct HttpConnecting { - state: State, - handle: Option, - keep_alive_timeout: Option, - nodelay: bool, -} + impl StdError for InvalidUrl { + fn description(&self) -> &str { + match *self { + InvalidUrl::MissingScheme => "invalid URL, missing scheme", + InvalidUrl::NotHttp => "invalid URL, scheme must be http", + InvalidUrl::MissingAuthority => "invalid URL, missing domain", + } + } + } + /// A Future representing work to connect to a URL. + #[must_use = "futures do nothing unless polled"] + pub struct HttpConnecting { + state: State, + handle: Option, + keep_alive_timeout: Option, + nodelay: bool, + } -enum State { - Lazy(HttpConnectExecutor, String, u16), - Resolving(oneshot::SpawnHandle), - Connecting(ConnectingTcp), - Error(Option), -} + enum State { + Lazy(HttpConnectExecutor, String, u16), + Resolving(oneshot::SpawnHandle), + Connecting(ConnectingTcp), + Error(Option), + } -impl Future for HttpConnecting { - type Item = (TcpStream, Connected); - type Error = io::Error; - - fn poll(&mut self) -> Poll { - loop { - let state; - match self.state { - State::Lazy(ref executor, ref mut host, port) => { - // If the host is already an IP addr (v4 or v6), - // skip resolving the dns and start connecting right away. - if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { - state = State::Connecting(ConnectingTcp { - addrs: addrs, - current: None - }) - } else { - let host = mem::replace(host, String::new()); - let work = dns::Work::new(host, port); - state = State::Resolving(oneshot::spawn(work, executor)); - } - }, - State::Resolving(ref mut future) => { - match try!(future.poll()) { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(addrs) => { + impl Future for HttpConnecting { + type Item = (TcpStream, Connected); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let state; + match self.state { + State::Lazy(ref executor, ref mut host, port) => { + // If the host is already an IP addr (v4 or v6), + // skip resolving the dns and start connecting right away. + if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { state = State::Connecting(ConnectingTcp { addrs: addrs, - current: None, + current: None }) + } else { + let host = mem::replace(host, String::new()); + let work = dns::Work::new(host, port); + state = State::Resolving(oneshot::spawn(work, executor)); + } + }, + State::Resolving(ref mut future) => { + match try!(future.poll()) { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(addrs) => { + state = State::Connecting(ConnectingTcp { + addrs: addrs, + current: None, + }) + } + }; + }, + State::Connecting(ref mut c) => { + let sock = try_ready!(c.poll(&self.handle)); + + if let Some(dur) = self.keep_alive_timeout { + sock.set_keepalive(Some(dur))?; } - }; - }, - State::Connecting(ref mut c) => { - let sock = try_ready!(c.poll(&self.handle)); - - if let Some(dur) = self.keep_alive_timeout { - sock.set_keepalive(Some(dur))?; - } - sock.set_nodelay(self.nodelay)?; + sock.set_nodelay(self.nodelay)?; - return Ok(Async::Ready((sock, Connected::new()))); - }, - State::Error(ref mut e) => return Err(e.take().expect("polled more than once")), + return Ok(Async::Ready((sock, Connected::new()))); + }, + State::Error(ref mut e) => return Err(e.take().expect("polled more than once")), + } + self.state = state; } - self.state = state; } } -} -impl fmt::Debug for HttpConnecting { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnecting") + impl fmt::Debug for HttpConnecting { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnecting") + } } -} -struct ConnectingTcp { - addrs: dns::IpAddrs, - current: Option, -} + struct ConnectingTcp { + addrs: dns::IpAddrs, + current: Option, + } -impl ConnectingTcp { - // not a Future, since passing a &Handle to poll - fn poll(&mut self, handle: &Option) -> Poll { - let mut err = None; - loop { - if let Some(ref mut current) = self.current { - match current.poll() { - Ok(ok) => return Ok(ok), - Err(e) => { - trace!("connect error {:?}", e); - err = Some(e); - if let Some(addr) = self.addrs.next() { - debug!("connecting to {}", addr); - *current = connect(&addr, handle)?; - continue; + impl ConnectingTcp { + // not a Future, since passing a &Handle to poll + fn poll(&mut self, handle: &Option) -> Poll { + let mut err = None; + loop { + if let Some(ref mut current) = self.current { + match current.poll() { + Ok(ok) => return Ok(ok), + Err(e) => { + trace!("connect error {:?}", e); + err = Some(e); + if let Some(addr) = self.addrs.next() { + debug!("connecting to {}", addr); + *current = connect(&addr, handle)?; + continue; + } } } + } else if let Some(addr) = self.addrs.next() { + debug!("connecting to {}", addr); + self.current = Some(connect(&addr, handle)?); + continue; } - } else if let Some(addr) = self.addrs.next() { - debug!("connecting to {}", addr); - self.current = Some(connect(&addr, handle)?); - continue; - } - return Err(err.take().expect("missing connect error")); + return Err(err.take().expect("missing connect error")); + } } } -} -// Make this Future unnameable outside of this crate. -mod http_connector { - use super::*; - // Blocking task to be executed on a thread pool. - pub struct HttpConnectorBlockingTask { - pub(super) work: oneshot::Execute - } + // Make this Future unnameable outside of this crate. + mod http_connector { + use super::*; + // Blocking task to be executed on a thread pool. + pub struct HttpConnectorBlockingTask { + pub(super) work: oneshot::Execute + } - impl fmt::Debug for HttpConnectorBlockingTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnectorBlockingTask") + impl fmt::Debug for HttpConnectorBlockingTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnectorBlockingTask") + } } - } - impl Future for HttpConnectorBlockingTask { - type Item = (); - type Error = (); + impl Future for HttpConnectorBlockingTask { + type Item = (); + type Error = (); - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() + fn poll(&mut self) -> Poll<(), ()> { + self.work.poll() + } } } -} -#[derive(Clone)] -struct HttpConnectExecutor(Arc + Send + Sync>); + #[derive(Clone)] + struct HttpConnectExecutor(Arc + Send + Sync>); -impl Executor> for HttpConnectExecutor { - fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { - self.0.execute(HttpConnectorBlockingTask { work: future }) - .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) + impl Executor> for HttpConnectExecutor { + fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { + self.0.execute(HttpConnectorBlockingTask { work: future }) + .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) + } } -} -#[cfg(test)] -mod tests { - #![allow(deprecated)] - use std::io; - use futures::Future; - use super::{Connect, Destination, HttpConnector}; - - #[test] - fn test_errors_missing_authority() { - let uri = "/foo/bar?baz".parse().unwrap(); - let dst = Destination { - uri, - }; - let connector = HttpConnector::new(1); - - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); - } + #[cfg(test)] + mod tests { + #![allow(deprecated)] + use std::io; + use futures::Future; + use super::{Connect, Destination, HttpConnector}; + + #[test] + fn test_errors_missing_authority() { + let uri = "/foo/bar?baz".parse().unwrap(); + let dst = Destination { + uri, + }; + let connector = HttpConnector::new(1); - #[test] - fn test_errors_enforce_http() { - let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); - let dst = Destination { - uri, - }; - let connector = HttpConnector::new(1); + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + } - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); - } + #[test] + fn test_errors_enforce_http() { + let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); + let dst = Destination { + uri, + }; + let connector = HttpConnector::new(1); + + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + } - #[test] - fn test_errors_missing_scheme() { - let uri = "example.domain".parse().unwrap(); - let dst = Destination { - uri, - }; - let connector = HttpConnector::new(1); + #[test] + fn test_errors_missing_scheme() { + let uri = "example.domain".parse().unwrap(); + let dst = Destination { + uri, + }; + let connector = HttpConnector::new(1); - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + } } } + diff --git a/src/client/mod.rs b/src/client/mod.rs index f580874e83..448fea7281 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -16,14 +16,15 @@ use body::{Body, Payload}; use common::Exec; use self::pool::{Pool, Poolable, Reservation}; -pub use self::connect::{Connect, HttpConnector}; +pub use self::connect::Connect; +#[cfg(feature = "runtime")] pub use self::connect::HttpConnector; use self::connect::Destination; pub mod conn; pub mod connect; pub(crate) mod dispatch; -mod dns; +#[cfg(feature = "runtime")] mod dns; mod pool; #[cfg(test)] mod tests; @@ -39,6 +40,7 @@ pub struct Client { ver: Ver, } +#[cfg(feature = "runtime")] impl Client { /// Create a new Client with the default config. #[inline] @@ -47,18 +49,22 @@ impl Client { } } +#[cfg(feature = "runtime")] impl Default for Client { fn default() -> Client { Client::new() } } -impl Client { +impl Client<(), Body> { /// Configure a Client. /// /// # Example /// /// ``` + /// # extern crate hyper; + /// # #[cfg(feature = "runtime")] + /// fn run () { /// use hyper::Client; /// /// let client = Client::builder() @@ -66,6 +72,8 @@ impl Client { /// .build_http(); /// # let infer: Client<_, hyper::Body> = client; /// # drop(infer); + /// # } + /// # fn main() {} /// ``` #[inline] pub fn builder() -> Builder { @@ -603,6 +611,7 @@ impl Builder { } /// Builder a client with this configuration and the default `HttpConnector`. + #[cfg(feature = "runtime")] pub fn build_http(&self) -> Client where B: Payload + Send, diff --git a/src/client/pool.rs b/src/client/pool.rs index 0adda3c9d2..fc6ecd62fd 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -652,7 +652,7 @@ mod tests { use std::time::Duration; use futures::{Async, Future}; use futures::future; - use super::{Connecting, Key, Poolable, Pool, Reservation, Exec, Ver}; + use super::{Connecting, Key, Poolable, Pool, Reservation, Ver}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] @@ -738,9 +738,11 @@ mod tests { }).wait().unwrap(); } + #[cfg(feature = "runtime")] #[test] fn test_pool_timer_removes_expired() { use std::sync::Arc; + use common::Exec; let runtime = ::tokio::runtime::Runtime::new().unwrap(); let pool = Pool::new(true, Some(Duration::from_millis(100))); diff --git a/src/client/tests.rs b/src/client/tests.rs index c29daa4346..6ea9ce5812 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,3 +1,4 @@ +#![cfg(feature = "runtime")] extern crate pretty_env_logger; use std::thread; diff --git a/src/common/exec.rs b/src/common/exec.rs index 2227448c2a..78cb023386 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -2,7 +2,6 @@ use std::fmt; use std::sync::Arc; use futures::future::{Executor, Future}; -use tokio_executor::spawn; /// Either the user provides an executor for background tasks, or we use /// `tokio::spawn`. @@ -19,7 +18,17 @@ impl Exec { F: Future + Send + 'static, { match *self { - Exec::Default => spawn(fut), + Exec::Default => { + #[cfg(feature = "runtime")] + { + ::tokio_executor::spawn(fut) + } + #[cfg(not(feature = "runtime"))] + { + // If no runtime, we need an executor! + panic!("executor must be set") + } + }, Exec::Executor(ref e) => { let _ = e.execute(Box::new(fut)) .map_err(|err| { diff --git a/src/error.rs b/src/error.rs index c62f23c281..3ec68dfaab 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,6 +39,7 @@ pub(crate) enum Kind { /// Error occurred while connecting. Connect, /// Error creating a TcpListener. + #[cfg(feature = "runtime")] Listen, /// Error accepting on an Incoming stream. Accept, @@ -171,6 +172,7 @@ impl Error { Error::new(Kind::Io, Some(cause.into())) } + #[cfg(feature = "runtime")] pub(crate) fn new_listen>(cause: E) -> Error { Error::new(Kind::Listen, Some(cause.into())) } @@ -258,6 +260,7 @@ impl StdError for Error { Kind::Closed => "connection closed", Kind::Connect => "an error occurred trying to connect", Kind::Canceled => "an operation was canceled internally before starting", + #[cfg(feature = "runtime")] Kind::Listen => "error creating server listener", Kind::Accept => "error accepting connection", Kind::NewService => "calling user's new_service failed", diff --git a/src/lib.rs b/src/lib.rs index eda2fe3352..5d329347cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,18 +18,20 @@ extern crate bytes; #[macro_use] extern crate futures; -extern crate futures_cpupool; +#[cfg(feature = "runtime")] extern crate futures_cpupool; extern crate futures_timer; extern crate h2; extern crate http; extern crate httparse; extern crate iovec; #[macro_use] extern crate log; -extern crate net2; +#[cfg(feature = "runtime")] extern crate net2; extern crate time; -extern crate tokio; -extern crate tokio_executor; +#[cfg(feature = "runtime")] extern crate tokio; +#[cfg(feature = "runtime")] extern crate tokio_executor; #[macro_use] extern crate tokio_io; +#[cfg(feature = "runtime")] extern crate tokio_reactor; +#[cfg(feature = "runtime")] extern crate tokio_tcp; extern crate want; #[cfg(all(test, feature = "nightly"))] @@ -62,3 +64,4 @@ mod headers; mod proto; pub mod server; pub mod service; +#[cfg(feature = "runtime")] pub mod rt; diff --git a/src/mock.rs b/src/mock.rs index b4bdef187b..d8147c2cc9 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,6 +1,8 @@ +#[cfg(feature = "runtime")] use std::collections::HashMap; use std::cmp; use std::io::{self, Read, Write}; +#[cfg(feature = "runtime")] use std::sync::{Arc, Mutex}; use bytes::Buf; @@ -8,6 +10,7 @@ use futures::{Async, Poll}; use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "runtime")] use ::client::connect::{Connect, Connected, Destination}; #[derive(Debug)] @@ -112,6 +115,7 @@ impl AsyncIo { self.max_read_vecs = cnt; } + #[cfg(feature = "runtime")] pub fn park_tasks(&mut self, enabled: bool) { self.park_tasks = enabled; } @@ -151,6 +155,7 @@ impl AsyncIo { } */ + #[cfg(feature = "runtime")] fn close(&mut self) { self.block_in(1); assert_eq!(self.inner.vec.len(), self.inner.pos); @@ -282,22 +287,26 @@ impl ::std::ops::Deref for AsyncIo { } } +#[cfg(feature = "runtime")] pub struct Duplex { inner: Arc>, } +#[cfg(feature = "runtime")] struct DuplexInner { handle_read_task: Option, read: AsyncIo, write: AsyncIo, } +#[cfg(feature = "runtime")] impl Read for Duplex { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.lock().unwrap().read.read(buf) } } +#[cfg(feature = "runtime")] impl Write for Duplex { fn write(&mut self, buf: &[u8]) -> io::Result { let mut inner = self.inner.lock().unwrap(); @@ -313,10 +322,11 @@ impl Write for Duplex { } } +#[cfg(feature = "runtime")] impl AsyncRead for Duplex { - } +#[cfg(feature = "runtime")] impl AsyncWrite for Duplex { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) @@ -331,10 +341,12 @@ impl AsyncWrite for Duplex { } } +#[cfg(feature = "runtime")] pub struct DuplexHandle { inner: Arc>, } +#[cfg(feature = "runtime")] impl DuplexHandle { pub fn read(&self, buf: &mut [u8]) -> Poll { let mut inner = self.inner.lock().unwrap(); @@ -362,6 +374,7 @@ impl DuplexHandle { } } +#[cfg(feature = "runtime")] impl Drop for DuplexHandle { fn drop(&mut self) { trace!("mock duplex handle drop"); @@ -371,10 +384,12 @@ impl Drop for DuplexHandle { } } +#[cfg(feature = "runtime")] pub struct MockConnector { mocks: Mutex>>, } +#[cfg(feature = "runtime")] impl MockConnector { pub fn new() -> MockConnector { MockConnector { @@ -410,6 +425,7 @@ impl MockConnector { } } +#[cfg(feature = "runtime")] impl Connect for MockConnector { type Transport = Duplex; type Error = io::Error; diff --git a/src/rt.rs b/src/rt.rs new file mode 100644 index 0000000000..f1de164416 --- /dev/null +++ b/src/rt.rs @@ -0,0 +1,11 @@ +//! Default runtime +//! +//! By default, hyper includes the [tokio](https://tokio.rs) runtime. To ease +//! using it, several types are re-exported here. +//! +//! The inclusion of a default runtime can be disabled by turning off hyper's +//! `runtime` Cargo feature. + +pub use futures::{Future, Stream}; +pub use futures::future::{lazy, poll_fn}; +pub use tokio::{run, spawn}; diff --git a/src/server/conn.rs b/src/server/conn.rs index 27eb57b752..d607900a5e 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -9,23 +9,22 @@ //! higher-level [Server](super) API. use std::fmt; -use std::net::SocketAddr; +#[cfg(feature = "runtime")] use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; +#[cfg(feature = "runtime")] use std::time::Duration; use bytes::Bytes; use futures::{Async, Future, Poll, Stream}; use futures::future::{Either, Executor}; use tokio_io::{AsyncRead, AsyncWrite}; -//TODO: change these tokio:: to sub-crates -use tokio::reactor::Handle; +#[cfg(feature = "runtime")] use tokio_reactor::Handle; use common::Exec; use proto; use body::{Body, Payload}; use service::{NewService, Service}; -pub use super::tcp::AddrIncoming; +#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming; /// A lower-level configuration of the HTTP protocol. /// @@ -190,22 +189,23 @@ impl Http { /// # Example /// /// ``` - /// # extern crate futures; /// # extern crate hyper; - /// # extern crate tokio; /// # extern crate tokio_io; - /// # use futures::Future; + /// # #[cfg(feature = "runtime")] + /// # extern crate tokio; /// # use hyper::{Body, Request, Response}; /// # use hyper::service::Service; /// # use hyper::server::conn::Http; /// # use tokio_io::{AsyncRead, AsyncWrite}; - /// # use tokio::reactor::Handle; + /// # #[cfg(feature = "runtime")] /// # fn run(some_io: I, some_service: S) /// # where /// # I: AsyncRead + AsyncWrite + Send + 'static, /// # S: Service + Send + 'static, /// # S::Future: Send /// # { + /// # use hyper::rt::Future; + /// # use tokio::reactor::Handle; /// let http = Http::new(); /// let conn = http.serve_connection(some_io, some_service); /// @@ -213,7 +213,7 @@ impl Http { /// eprintln!("server connection error: {}", e); /// }); /// - /// tokio::spawn(fut); + /// hyper::rt::spawn(fut); /// # } /// # fn main() {} /// ``` @@ -252,6 +252,7 @@ impl Http { /// to accept connections. Each connection will be processed with the /// `new_service` object provided, creating a new service per /// connection. + #[cfg(feature = "runtime")] pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> where S: NewService, @@ -271,6 +272,7 @@ impl Http { /// to accept connections. Each connection will be processed with the /// `new_service` object provided, creating a new service per /// connection. + #[cfg(feature = "runtime")] pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where S: NewService, @@ -465,6 +467,7 @@ where // ===== impl SpawnAll ===== +#[cfg(feature = "runtime")] impl SpawnAll { pub(super) fn local_addr(&self) -> SocketAddr { self.serve.incoming.local_addr() diff --git a/src/server/mod.rs b/src/server/mod.rs index cf5487a795..7f6b9bde92 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,15 +17,14 @@ //! ## Example //! //! ```no_run -//! extern crate futures; //! extern crate hyper; -//! extern crate tokio; //! -//! use futures::Future; //! use hyper::{Body, Response, Server}; //! use hyper::service::service_fn_ok; //! +//! # #[cfg(feature = "runtime")] //! fn main() { +//! # use hyper::rt::Future; //! // Construct our SocketAddr to listen on... //! let addr = ([127, 0, 0, 1], 3000).into(); //! @@ -41,18 +40,20 @@ //! .serve(new_service); //! //! // Finally, spawn `server` onto an Executor... -//! tokio::run(server.map_err(|e| { +//! hyper::rt::run(server.map_err(|e| { //! eprintln!("server error: {}", e); //! })); //! } +//! # #[cfg(not(feature = "runtime"))] +//! # fn main() {} //! ``` pub mod conn; -mod tcp; +#[cfg(feature = "runtime")] mod tcp; use std::fmt; -use std::net::SocketAddr; -use std::time::Duration; +#[cfg(feature = "runtime")] use std::net::SocketAddr; +#[cfg(feature = "runtime")] use std::time::Duration; use futures::{Future, Stream, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -62,8 +63,7 @@ use service::{NewService, Service}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... use self::conn::{Http as Http_, SpawnAll}; -//use self::hyper_service::HyperService; -use self::tcp::{AddrIncoming}; +#[cfg(feature = "runtime")] use self::tcp::{AddrIncoming}; /// A listening HTTP server. /// @@ -94,6 +94,7 @@ impl Server { } } +#[cfg(feature = "runtime")] impl Server { /// Binds to the provided address, and returns a [`Builder`](Builder). /// @@ -116,6 +117,7 @@ impl Server { } } +#[cfg(feature = "runtime")] impl Server { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> SocketAddr { @@ -176,7 +178,11 @@ impl Builder { /// /// # Example /// - /// ```rust + /// ``` + /// # extern crate hyper; + /// # fn main() {} + /// # #[cfg(feature = "runtime")] + /// # fn run() { /// use hyper::{Body, Response, Server}; /// use hyper::service::service_fn_ok; /// @@ -195,6 +201,7 @@ impl Builder { /// .serve(new_service); /// /// // Finally, spawn `server` onto an Executor... + /// # } /// ``` pub fn serve(self, new_service: S) -> Server where @@ -215,6 +222,7 @@ impl Builder { } } +#[cfg(feature = "runtime")] impl Builder { /// Set whether TCP keepalive messages are enabled on accepted connections. /// diff --git a/src/server/tcp.rs b/src/server/tcp.rs index c573f55f35..e109294713 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -5,9 +5,8 @@ use std::time::Duration; use futures::{Async, Future, Poll, Stream}; use futures_timer::Delay; -//TODO: change to tokio_tcp::net::TcpListener -use tokio::net::TcpListener; -use tokio::reactor::Handle; +use tokio_tcp::TcpListener; +use tokio_reactor::Handle; use self::addr_stream::AddrStream; @@ -170,7 +169,7 @@ mod addr_stream { use std::net::SocketAddr; use bytes::{Buf, BufMut}; use futures::Poll; - use tokio::net::TcpStream; + use tokio_tcp::TcpStream; use tokio_io::{AsyncRead, AsyncWrite};