From 27b8db3af8852ba8280a2868f703d3230a1db85e Mon Sep 17 00:00:00 2001 From: Sam Reis Date: Thu, 15 Mar 2018 10:46:03 +1100 Subject: [PATCH] feat(lib): convert to use tokio 0.1 BREAKING CHANGE: All uses of `Handle` now need to be new-tokio `Handle`. Co-authored-by: Sean McArthur --- .appveyor.yml | 2 + .travis.yml | 1 + Cargo.toml | 5 +- benches/end_to_end.rs | 66 ++++---- benches/server.rs | 3 +- examples/client.rs | 37 ++--- examples/hello.rs | 19 ++- examples/multi_server.rs | 36 ++--- examples/params.rs | 12 +- examples/send_file.rs | 14 +- examples/server.rs | 12 +- examples/web_api.rs | 29 ++-- src/client/conn.rs | 5 +- src/client/connect.rs | 63 +++++--- src/client/dispatch.rs | 39 ++++- src/client/mod.rs | 99 +++++++----- src/client/pool.rs | 31 ++-- src/client/tests.rs | 26 ++-- src/lib.rs | 5 +- src/mock.rs | 37 ++--- src/server/mod.rs | 186 +++++++++++++++-------- tests/client.rs | 314 ++++++++++++++++++++------------------- tests/server.rs | 142 ++++++++++-------- 23 files changed, 679 insertions(+), 504 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index b0afb8a56f..6c3dec25f7 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -1,4 +1,5 @@ environment: + RUST_BACKTRACE: 1 matrix: - TARGET: x86_64-pc-windows-msvc - TARGET: i686-pc-windows-msvc @@ -8,6 +9,7 @@ install: - curl -sSf -o rustup-init.exe https://win.rustup.rs/ - rustup-init.exe -y --default-host %TARGET% - set PATH=%PATH%;C:\Users\appveyor\.cargo\bin + - rustc -vV - cargo -vV build: false diff --git a/.travis.yml b/.travis.yml index 0a493525a9..50ad988fa1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -54,4 +54,5 @@ after_success: env: global: + - RUST_BACKTRACE=1 - secure: KipdEhZsGIrb2W0HsDbC95x8FJ1RKEWPq8uSK8wSZwGw6MtvoZDX0edfrtf4o3/skA0h84yn35ZWF/rpo1ZEesgFY1g+l+me+jtyGvMwEsXTGjNP4oNR2MrDizjO8eYDm4hRUCLEmJVvsq4j7oNVdLGHfdrcnwqk8/NxJsRzqXM= diff --git a/Cargo.toml b/Cargo.toml index 18f9662587..31620180e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,16 +25,19 @@ base64 = "0.9" bytes = "0.4.4" futures = "0.1.17" futures-cpupool = "0.1.6" +futures-timer = "0.1.0" http = "0.1.5" httparse = "1.0" iovec = "0.1" language-tags = "0.2" log = "0.4" mime = "0.3.2" +net2 = "0.2.32" percent-encoding = "1.0" relay = "0.1" time = "0.1" -tokio-core = "0.1.11" +tokio = "0.1.3" +tokio-executor = "0.1.0" tokio-service = "0.1" tokio-io = "0.1" unicase = "2.0" diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index fc2992acd5..8c9aeecf57 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -4,48 +4,47 @@ extern crate futures; extern crate hyper; extern crate test; -extern crate tokio_core; +extern crate tokio; use std::net::SocketAddr; use futures::{Future, Stream}; -use tokio_core::reactor::{Core, Handle}; -use tokio_core::net::TcpListener; +use tokio::runtime::Runtime; +use tokio::net::TcpListener; use hyper::{Body, Method, Request, Response}; +use hyper::server::Http; #[bench] fn get_one_at_a_time(b: &mut test::Bencher) { - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let addr = spawn_hello(&handle); + let mut rt = Runtime::new().unwrap(); + let addr = spawn_hello(&mut rt); - let client = hyper::Client::new(&handle); + let client = hyper::Client::configure() + .build_with_executor(&rt.handle(), rt.executor()); let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap(); b.bytes = 160 * 2 + PHRASE.len() as u64; b.iter(move || { - let work = client.get(url.clone()).and_then(|res| { - res.into_body().into_stream().for_each(|_chunk| { - Ok(()) + client.get(url.clone()) + .and_then(|res| { + res.into_body().into_stream().for_each(|_chunk| { + Ok(()) + }) }) - }); - - core.run(work).unwrap(); + .wait().expect("client wait"); }); } #[bench] fn post_one_at_a_time(b: &mut test::Bencher) { - extern crate pretty_env_logger; - let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let addr = spawn_hello(&handle); + let mut rt = Runtime::new().unwrap(); + let addr = spawn_hello(&mut rt); - let client = hyper::Client::new(&handle); + let client = hyper::Client::configure() + .build_with_executor(&rt.handle(), rt.executor()); let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap(); @@ -55,26 +54,24 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let mut req = Request::new(post.into()); *req.method_mut() = Method::POST; *req.uri_mut() = url.clone(); - let work = client.request(req).and_then(|res| { + client.request(req).and_then(|res| { res.into_body().into_stream().for_each(|_chunk| { Ok(()) }) - }); + }).wait().expect("client wait"); - core.run(work).unwrap(); }); } static PHRASE: &'static [u8] = include_bytes!("../CHANGELOG.md"); //b"Hello, World!"; -fn spawn_hello(handle: &Handle) -> SocketAddr { +fn spawn_hello(rt: &mut Runtime) -> SocketAddr { use hyper::server::{const_service, service_fn, NewService}; let addr = "127.0.0.1:0".parse().unwrap(); - let listener = TcpListener::bind(&addr, handle).unwrap(); + let listener = TcpListener::bind(&addr).unwrap(); let addr = listener.local_addr().unwrap(); - let handle2 = handle.clone(); - let http = hyper::server::Http::::new(); + let http = Http::::new(); let service = const_service(service_fn(|req: Request| { req.into_body() @@ -85,16 +82,15 @@ fn spawn_hello(handle: &Handle) -> SocketAddr { }) })); - let mut conns = 0; - handle.spawn(listener.incoming().for_each(move |(socket, _addr)| { - conns += 1; - assert_eq!(conns, 1, "should only need 1 connection"); - handle2.spawn( - http.serve_connection(socket, service.new_service()?) + let srv = listener.incoming() + .into_future() + .map_err(|(e, _inc)| panic!("accept error: {}", e)) + .and_then(move |(accepted, _inc)| { + let socket = accepted.expect("accepted socket"); + http.serve_connection(socket, service.new_service().expect("new_service")) .map(|_| ()) .map_err(|_| ()) - ); - Ok(()) - }).then(|_| Ok(()))); + }); + rt.spawn(srv); return addr } diff --git a/benches/server.rs b/benches/server.rs index 7ddc43b3b1..158bdb1ffc 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -5,6 +5,7 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; extern crate test; +extern crate tokio; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; @@ -30,7 +31,7 @@ macro_rules! bench_server { })).unwrap(); let addr = srv.local_addr().unwrap(); addr_tx.send(addr).unwrap(); - srv.run_until(until_rx.map_err(|_| ())).unwrap(); + tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e))); }); addr_rx.recv().unwrap() diff --git a/examples/client.rs b/examples/client.rs index b5df77e25d..e5d9a2862a 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,15 +1,15 @@ //#![deny(warnings)] extern crate futures; extern crate hyper; -extern crate tokio_core; +extern crate tokio; extern crate pretty_env_logger; use std::env; use std::io::{self, Write}; -use futures::Future; -use futures::stream::Stream; +use futures::{Future, Stream}; +use futures::future::lazy; use hyper::{Body, Client, Request}; @@ -30,22 +30,23 @@ fn main() { return; } - let mut core = tokio_core::reactor::Core::new().unwrap(); - let handle = core.handle(); - let client = Client::new(&handle); + tokio::run(lazy(move || { + let client = Client::default(); - let mut req = Request::new(Body::empty()); - *req.uri_mut() = url; - let work = client.request(req).and_then(|res| { - println!("Response: {}", res.status()); - println!("Headers: {:#?}", res.headers()); + let mut req = Request::new(Body::empty()); + *req.uri_mut() = url; - res.into_parts().1.into_stream().for_each(|chunk| { - io::stdout().write_all(&chunk).map_err(From::from) - }) - }).map(|_| { - println!("\n\nDone."); - }); + client.request(req).and_then(|res| { + println!("Response: {}", res.status()); + println!("Headers: {:#?}", res.headers()); - core.run(work).unwrap(); + res.into_parts().1.into_stream().for_each(|chunk| { + io::stdout().write_all(&chunk).map_err(From::from) + }) + }).map(|_| { + println!("\n\nDone."); + }).map_err(|err| { + eprintln!("Error {}", err); + }) + })); } diff --git a/examples/hello.rs b/examples/hello.rs index f62200f4cf..80d33542db 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -2,6 +2,10 @@ extern crate hyper; extern crate futures; extern crate pretty_env_logger; +extern crate tokio; + +use futures::Future; +use futures::future::lazy; use hyper::{Body, Response}; use hyper::server::{Http, const_service, service_fn}; @@ -16,10 +20,13 @@ fn main() { Ok(Response::new(Body::from(PHRASE))) })); - let server = Http::new() - .sleep_on_errors(true) - .bind(&addr, new_service) - .unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().unwrap(); + tokio::run(lazy(move || { + let server = Http::new() + .sleep_on_errors(true) + .bind(&addr, new_service) + .unwrap(); + + println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); + server.run().map_err(|err| eprintln!("Server error {}", err)) + })); } diff --git a/examples/multi_server.rs b/examples/multi_server.rs index cb55f64055..239d7abb90 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -1,14 +1,13 @@ #![deny(warnings)] extern crate hyper; extern crate futures; -extern crate tokio_core; extern crate pretty_env_logger; +extern crate tokio; use futures::{Future, Stream}; -use futures::future::FutureResult; +use futures::future::{FutureResult, lazy}; use hyper::{Body, Method, Request, Response, StatusCode}; -use tokio_core::reactor::Core; use hyper::server::{Http, Service}; static INDEX1: &'static [u8] = b"The 1st service!"; @@ -44,26 +43,23 @@ fn main() { let addr1 = "127.0.0.1:1337".parse().unwrap(); let addr2 = "127.0.0.1:1338".parse().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + tokio::run(lazy(move || { + let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap(); + let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap(); - let srv1 = Http::new().serve_addr_handle(&addr1, &handle, || Ok(Srv(INDEX1))).unwrap(); - let srv2 = Http::new().serve_addr_handle(&addr2, &handle, || Ok(Srv(INDEX2))).unwrap(); + println!("Listening on http://{}", srv1.incoming_ref().local_addr()); + println!("Listening on http://{}", srv2.incoming_ref().local_addr()); - println!("Listening on http://{}", srv1.incoming_ref().local_addr()); - println!("Listening on http://{}", srv2.incoming_ref().local_addr()); + tokio::spawn(srv1.for_each(move |conn| { + tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err))); + Ok(()) + }).map_err(|_| ())); - let handle1 = handle.clone(); - handle.spawn(srv1.for_each(move |conn| { - handle1.spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err))); - Ok(()) - }).map_err(|_| ())); + tokio::spawn(srv2.for_each(move |conn| { + tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err))); + Ok(()) + }).map_err(|_| ())); - let handle2 = handle.clone(); - handle.spawn(srv2.for_each(move |conn| { - handle2.spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err))); Ok(()) - }).map_err(|_| ())); - - core.run(futures::future::empty::<(), ()>()).unwrap(); + })); } diff --git a/examples/params.rs b/examples/params.rs index c632daa88e..d362840f52 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -2,9 +2,11 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; +extern crate tokio; extern crate url; use futures::{Future, Stream}; +use futures::future::lazy; use hyper::{Body, Method, Request, Response, StatusCode}; use hyper::server::{Http, Service}; @@ -22,7 +24,7 @@ impl Service for ParamExample { type Request = Request; type Response = Response; type Error = hyper::Error; - type Future = Box>; + type Future = Box + Send>; fn call(&self, req: Request) -> Self::Future { match (req.method(), req.uri().path()) { @@ -96,7 +98,9 @@ fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:1337".parse().unwrap(); - let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().unwrap(); + tokio::run(lazy(move || { + let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap(); + println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); + server.run().map_err(|err| eprintln!("Server error {}", err)) + })); } diff --git a/examples/send_file.rs b/examples/send_file.rs index 7e507fe33f..8e6fe917a0 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -2,8 +2,10 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; +extern crate tokio; use futures::{Future/*, Sink*/}; +use futures::future::lazy; use futures::sync::oneshot; use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; @@ -17,7 +19,7 @@ use std::thread; static NOTFOUND: &[u8] = b"Not Found"; static INDEX: &str = "examples/send_file_index.html"; -fn simple_file_send(f: &str) -> Box, Error = hyper::Error>> { +fn simple_file_send(f: &str) -> Box, Error = hyper::Error> + Send> { // Serve a file by reading it entirely into memory. As a result // this is limited to serving small files, but it is somewhat // simpler with a little less overhead. @@ -63,7 +65,7 @@ impl Service for ResponseExamples { type Request = Request; type Response = Response; type Error = hyper::Error; - type Future = Box>; + type Future = Box + Send>; fn call(&self, req: Request) -> Self::Future { match (req.method(), req.uri().path()) { @@ -139,7 +141,9 @@ fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:1337".parse().unwrap(); - let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().unwrap(); + tokio::run(lazy(move || { + let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap(); + println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); + server.run().map_err(|err| eprintln!("Server error {}", err)) + })); } diff --git a/examples/server.rs b/examples/server.rs index 4c8cba4229..b5d2153958 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -2,8 +2,10 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; +extern crate tokio; -use futures::future::FutureResult; +use futures::Future; +use futures::future::{FutureResult, lazy}; use hyper::{Body, Method, Request, Response, StatusCode}; use hyper::server::{Http, Service}; @@ -41,7 +43,9 @@ fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:1337".parse().unwrap(); - let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().unwrap(); + tokio::run(lazy(move || { + let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); + println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); + server.run().map_err(|err| eprintln!("Server error {}", err)) + })); } diff --git a/examples/web_api.rs b/examples/web_api.rs index 566dad4c49..d3261e9ae4 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -2,9 +2,11 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -extern crate tokio_core; +extern crate tokio; use futures::{Future, Stream}; +use futures::future::lazy; +use tokio::reactor::Handle; use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode}; use hyper::server::{Http, Service}; @@ -17,13 +19,13 @@ static URL: &str = "http://127.0.0.1:1337/web_api"; static INDEX: &[u8] = b"test.html"; static LOWERCASE: &[u8] = b"i am a lower case string"; -struct ResponseExamples(tokio_core::reactor::Handle); +struct ResponseExamples(Handle); impl Service for ResponseExamples { type Request = Request; type Response = Response; type Error = hyper::Error; - type Future = Box>; + type Future = Box + Send>; fn call(&self, req: Self::Request) -> Self::Future { match (req.method(), req.uri().path()) { @@ -76,18 +78,13 @@ fn main() { pretty_env_logger::init(); let addr = "127.0.0.1:1337".parse().unwrap(); - let mut core = tokio_core::reactor::Core::new().unwrap(); - let handle = core.handle(); - let client_handle = core.handle(); + tokio::run(lazy(move || { + let handle = Handle::current(); + let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap(); + println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr()); - let serve = Http::new().serve_addr_handle(&addr, &handle, move || Ok(ResponseExamples(client_handle.clone()))).unwrap(); - println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr()); - - let h2 = handle.clone(); - handle.spawn(serve.for_each(move |conn| { - h2.spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err))); - Ok(()) - }).map_err(|_| ())); - - core.run(futures::future::empty::<(), ()>()).unwrap(); + serve.map_err(|_| ()).for_each(move |conn| { + tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err))) + }) + })); } diff --git a/src/client/conn.rs b/src/client/conn.rs index 2a01d0135b..9d43ec2655 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -208,7 +208,10 @@ where } //TODO: replace with `impl Future` when stable - pub(crate) fn send_request_retryable(&mut self, req: Request) -> Box, Error=(::Error, Option>)>> { + pub(crate) fn send_request_retryable(&mut self, req: Request) -> Box, Error=(::Error, Option>)> + Send> + where + B: Send, + { let inner = match self.dispatch.try_send(req) { Ok(rx) => { Either::A(rx.then(move |res| { diff --git a/src/client/connect.rs b/src/client/connect.rs index 296c06d161..33e636c067 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -9,6 +9,7 @@ use std::error::Error as StdError; use std::fmt; use std::io; use std::mem; +use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -18,9 +19,10 @@ use futures::sync::oneshot; use futures_cpupool::{Builder as CpuPoolBuilder}; use http::Uri; use http::uri::Scheme; +use net2::TcpBuilder; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; -use tokio::net::{TcpStream, TcpStreamNew}; +use tokio::net::{TcpStream, ConnectFuture}; use super::dns; use self::http_connector::HttpConnectorBlockingTask; @@ -30,13 +32,13 @@ use self::http_connector::HttpConnectorBlockingTask; /// A connector receives a [`Destination`](Destination) describing how a /// connection should be estabilished, and returns a `Future` of the /// ready connection. -pub trait Connect { +pub trait Connect: Send + Sync { /// The connected IO Stream. - type Transport: AsyncRead + AsyncWrite + 'static; + type Transport: AsyncRead + AsyncWrite + Send + 'static; /// An error occured when trying to connect. type Error; /// A Future that will resolve to the connected Transport. - type Future: Future; + type Future: Future + Send; /// Connect to a destination. fn connect(&self, dst: Destination) -> Self::Future; } @@ -133,6 +135,28 @@ impl Connected { */ } +fn connect(addr: &SocketAddr, handle: &Handle) -> io::Result { + let builder = match addr { + &SocketAddr::V4(_) => TcpBuilder::new_v4()?, + &SocketAddr::V6(_) => TcpBuilder::new_v6()?, + }; + + if cfg!(windows) { + // Windows requires a socket be bound before calling connect + let any: SocketAddr = match addr { + &SocketAddr::V4(_) => { + ([0, 0, 0, 0], 0).into() + }, + &SocketAddr::V6(_) => { + ([0, 0, 0, 0, 0, 0, 0, 0], 0).into() + } + }; + builder.bind(any)?; + } + + Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) +} + /// A connector for the `http` scheme. /// /// Performs DNS resolution in a thread pool, and then connects over TCP. @@ -162,7 +186,7 @@ impl HttpConnector { /// Takes an executor to run blocking tasks on. #[inline] pub fn new_with_executor(executor: E, handle: &Handle) -> HttpConnector - where E: Executor + where E: Executor + Send + Sync { HttpConnector { executor: HttpConnectExecutor(Arc::new(executor)), @@ -336,7 +360,7 @@ impl fmt::Debug for HttpConnecting { struct ConnectingTcp { addrs: dns::IpAddrs, - current: Option, + current: Option, } impl ConnectingTcp { @@ -352,14 +376,14 @@ impl ConnectingTcp { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = TcpStream::connect(&addr, handle); + *current = connect(&addr, handle)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(TcpStream::connect(&addr, handle)); + self.current = Some(connect(&addr, handle)?); continue; } @@ -393,7 +417,7 @@ mod http_connector { } #[derive(Clone)] -struct HttpConnectExecutor(Arc>); +struct HttpConnectExecutor(Arc + Send + Sync>); impl Executor> for HttpConnectExecutor { fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { @@ -406,43 +430,44 @@ impl Executor> for HttpConnectExecutor { mod tests { #![allow(deprecated)] use std::io; - use tokio::reactor::Core; + use futures::Future; + use tokio::runtime::Runtime; use super::{Connect, Destination, HttpConnector}; #[test] fn test_errors_missing_authority() { - let mut core = Core::new().unwrap(); + let runtime = Runtime::new().unwrap(); let uri = "/foo/bar?baz".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1, &core.handle()); + let connector = HttpConnector::new(1, runtime.handle()); - assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } #[test] fn test_errors_enforce_http() { - let mut core = Core::new().unwrap(); + let runtime = Runtime::new().unwrap(); let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1, &core.handle()); + let connector = HttpConnector::new(1, runtime.handle()); - assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } #[test] fn test_errors_missing_scheme() { - let mut core = Core::new().unwrap(); + let runtime = Runtime::new().unwrap(); let uri = "example.domain".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1, &core.handle()); + let connector = HttpConnector::new(1, runtime.handle()); - assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index efc720d8df..1dffc1f8cc 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -28,7 +28,8 @@ pub struct Sender { // response have been fully processed, and a connection is ready // for more. giver: signal::Giver, - inner: mpsc::Sender<(T, Callback)>, + //inner: mpsc::Sender<(T, Callback)>, + inner: mpsc::Sender>, } impl Sender { @@ -51,21 +52,22 @@ impl Sender { pub fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); - self.inner.try_send((val, Callback::Retry(tx))) + self.inner.try_send(Envelope(Some((val, Callback::Retry(tx))))) .map(move |_| rx) - .map_err(|e| e.into_inner().0) + .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) } pub fn send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); - self.inner.try_send((val, Callback::NoRetry(tx))) + self.inner.try_send(Envelope(Some((val, Callback::NoRetry(tx))))) .map(move |_| rx) - .map_err(|e| e.into_inner().0) + .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) } } pub struct Receiver { - inner: mpsc::Receiver<(T, Callback)>, + //inner: mpsc::Receiver<(T, Callback)>, + inner: mpsc::Receiver>, taker: signal::Taker, } @@ -75,7 +77,9 @@ impl Stream for Receiver { fn poll(&mut self) -> Poll, Self::Error> { match self.inner.poll() { - Ok(Async::Ready(item)) => Ok(Async::Ready(item)), + Ok(Async::Ready(item)) => Ok(Async::Ready(item.map(|mut env| { + env.0.take().expect("envelope not dropped") + }))), Ok(Async::NotReady) => { self.taker.want(); Ok(Async::NotReady) @@ -85,6 +89,16 @@ impl Stream for Receiver { } } +/* +TODO: with futures 0.2, bring this Drop back and toss Envelope + +The problem is, there is a bug in futures 0.1 mpsc channel, where +even though you may call `rx.close()`, `rx.poll()` may still think +there are messages and so should park the current task. In futures +0.2, we can use `try_next`, and not even risk such a bug. + +For now, use an `Envelope` that has this drop guard logic instead. + impl Drop for Receiver { fn drop(&mut self) { self.taker.cancel(); @@ -105,6 +119,17 @@ impl Drop for Receiver { } } +*/ + +struct Envelope(Option<(T, Callback)>); + +impl Drop for Envelope { + fn drop(&mut self) { + if let Some((val, cb)) = self.0.take() { + let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val)))); + } + } +} pub enum Callback { Retry(oneshot::Sender)>>), diff --git a/src/client/mod.rs b/src/client/mod.rs index 87f34f2bde..1761ae7aeb 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -3,7 +3,6 @@ use std::fmt; use std::io; use std::marker::PhantomData; -use std::rc::Rc; use std::sync::Arc; use std::time::Duration; @@ -13,6 +12,7 @@ use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; use http::uri::Scheme; use tokio::reactor::Handle; +use tokio_executor::spawn; pub use tokio_service::Service; use proto::body::{Body, Entity}; @@ -36,7 +36,7 @@ mod tests; /// A Client to make outgoing HTTP requests. pub struct Client { - connector: Rc, + connector: Arc, executor: Exec, h1_writev: bool, pool: Pool>, @@ -52,6 +52,12 @@ impl Client { } } +impl Default for Client { + fn default() -> Client { + Client::new(&Handle::current()) + } +} + impl Client { /// Configure a Client. /// @@ -59,11 +65,11 @@ impl Client { /// /// ```no_run /// # extern crate hyper; - /// # extern crate tokio_core; + /// # extern crate tokio; /// /// # fn main() { - /// # let core = tokio_core::reactor::Core::new().unwrap(); - /// # let handle = core.handle(); + /// # let runtime = tokio::runtime::Runtime::new().unwrap(); + /// # let handle = runtime.handle(); /// let client = hyper::Client::configure() /// .keep_alive(true) /// .build(&handle); @@ -77,22 +83,10 @@ impl Client { } impl Client { - // Eventually, a Client won't really care about a tokio Handle, and only - // the executor used to spawn background tasks. Removing this method is - // a breaking change, so for now, it's just deprecated. - #[doc(hidden)] - #[deprecated] - pub fn handle(&self) -> &Handle { - match self.executor { - Exec::Handle(ref h) => h, - Exec::Executor(..) => panic!("Client not built with a Handle"), - } - } - #[inline] fn configured(config: Config, exec: Exec) -> Client { Client { - connector: Rc::new(config.connector), + connector: Arc::new(config.connector), executor: exec, h1_writev: config.h1_writev, pool: Pool::new(config.keep_alive, config.keep_alive_timeout), @@ -103,10 +97,11 @@ impl Client { } impl Client -where C: Connect + 'static, +where C: Connect + Sync + 'static, C::Transport: 'static, C::Future: 'static, - B: Entity + 'static, + B: Entity + Send + 'static, + B::Data: Send, { /// Send a `GET` request to the supplied `Uri`. @@ -195,7 +190,7 @@ where C: Connect + 'static, } //TODO: replace with `impl Future` when stable - fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError>> { + fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError> + Send> { let url = req.uri().clone(); let checkout = self.pool.checkout(domain); let connect = { @@ -280,16 +275,15 @@ where C: Connect + 'static, } fn schedule_pool_timer(&self) { - if let Exec::Handle(ref h) = self.executor { - self.pool.spawn_expired_interval(h); - } + self.pool.spawn_expired_interval(&self.executor); } } impl Service for Client where C: Connect + 'static, C::Future: 'static, - B: Entity + 'static, + B: Entity + Send + 'static, + B::Data: Send, { type Request = Request; type Response = Response; @@ -323,7 +317,7 @@ impl fmt::Debug for Client { /// A `Future` that will resolve to an HTTP Response. #[must_use = "futures do nothing unless polled"] -pub struct FutureResponse(Box, Error=::Error> + 'static>); +pub struct FutureResponse(Box, Error=::Error> + Send + 'static>); impl fmt::Debug for FutureResponse { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -343,7 +337,7 @@ impl Future for FutureResponse { struct RetryableSendRequest { client: Client, domain: String, - future: Box, Error=ClientError>>, + future: Box, Error=ClientError> + Send>, uri: Uri, } @@ -351,7 +345,8 @@ impl Future for RetryableSendRequest where C: Connect + 'static, C::Future: 'static, - B: Entity + 'static, + B: Entity + Send + 'static, + B::Data: Send, { type Item = Response; type Error = ::Error; @@ -562,12 +557,13 @@ impl Config where C: Connect, C::Transport: 'static, C::Future: 'static, - B: Entity, + B: Entity + Send, + B::Data: Send, { /// Construct the Client with this configuration. #[inline] - pub fn build(self, handle: &Handle) -> Client { - Client::configured(self, Exec::Handle(handle.clone())) + pub fn build(self) -> Client { + Client::configured(self, Exec::Default) } /// Construct a Client with this configuration and an executor. @@ -576,14 +572,16 @@ where C: Connect, /// to drive requests and responses. pub fn executor(self, executor: E) -> Client where - E: Executor + 'static, + E: Executor + Send + Sync + 'static, { - Client::configured(self, Exec::Executor(Rc::new(executor))) + Client::configured(self, Exec::new(executor)) } } impl Config -where B: Entity, +where + B: Entity + Send, + B::Data: Send, { /// Construct the Client with this configuration. #[inline] @@ -592,7 +590,22 @@ where B: Entity, if self.keep_alive { connector.set_keepalive(self.keep_alive_timeout); } - self.connector(connector).build(handle) + self.connector(connector).build() + } + + /// Construct a Client with this configuration and an executor. + /// + /// The executor will be used to spawn "background" connection tasks + /// to drive requests and responses. + pub fn build_with_executor(self, handle: &Handle, executor: E) -> Client + where + E: Executor + Send + Sync + 'static, + { + let mut connector = HttpConnector::new(4, handle); + if self.keep_alive { + connector.set_keepalive(self.keep_alive_timeout); + } + self.connector(connector).executor(executor) } } @@ -622,18 +635,22 @@ impl Clone for Config { #[derive(Clone)] enum Exec { - Handle(Handle), - Executor(Rc>), + Default, + Executor(Arc + Send + Sync>), } impl Exec { + pub(crate) fn new + Send + Sync + 'static>(executor: E) -> Exec { + Exec::Executor(Arc::new(executor)) + } + fn execute(&self, fut: F) -> io::Result<()> where - F: Future + 'static, + F: Future + Send + 'static, { match *self { - Exec::Handle(ref h) => h.spawn(fut), + Exec::Default => spawn(fut), Exec::Executor(ref e) => { e.execute(bg(Box::new(fut))) .map_err(|err| { @@ -660,10 +677,10 @@ mod background { // and only implementeds `Future`. #[allow(missing_debug_implementations)] pub struct Background { - inner: Box>, + inner: Box + Send>, } - pub fn bg(fut: Box>) -> Background { + pub fn bg(fut: Box + Send>) -> Background { Background { inner: fut, } diff --git a/src/client/pool.rs b/src/client/pool.rs index 7531b77e44..de4024b04b 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -6,7 +6,9 @@ use std::time::{Duration, Instant}; use futures::{Future, Async, Poll, Stream}; use futures::sync::oneshot; -use tokio::reactor::{Handle, Interval}; +use futures_timer::Interval; + +use super::Exec; pub struct Pool { inner: Arc>>, @@ -218,8 +220,8 @@ impl PoolInner { } -impl Pool { - pub(super) fn spawn_expired_interval(&self, handle: &Handle) { +impl Pool { + pub(super) fn spawn_expired_interval(&self, exec: &Exec) { let dur = { let mut inner = self.inner.lock().unwrap(); @@ -239,12 +241,11 @@ impl Pool { } }; - let interval = Interval::new(dur, handle) - .expect("reactor is gone"); - handle.spawn(IdleInterval { + let interval = Interval::new(dur); + exec.execute(IdleInterval { interval: interval, pool: Arc::downgrade(&self.inner), - }); + }).unwrap(); } } @@ -431,7 +432,7 @@ mod tests { use std::time::Duration; use futures::{Async, Future}; use futures::future; - use super::{Closed, Pool}; + use super::{Closed, Pool, Exec}; impl Closed for i32 { fn is_closed(&self) -> bool { @@ -489,9 +490,11 @@ mod tests { #[test] fn test_pool_timer_removes_expired() { - let mut core = ::tokio::reactor::Core::new().unwrap(); + let runtime = ::tokio::runtime::Runtime::new().unwrap(); let pool = Pool::new(true, Some(Duration::from_millis(100))); - pool.spawn_expired_interval(&core.handle()); + + let executor = runtime.executor(); + pool.spawn_expired_interval(&Exec::new(executor)); let key = Arc::new("foo".to_string()); pool.pooled(key.clone(), 41); @@ -500,11 +503,9 @@ mod tests { assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); - let timeout = ::tokio::reactor::Timeout::new( - Duration::from_millis(400), // allow for too-good resolution - &core.handle() - ).unwrap(); - core.run(timeout).unwrap(); + ::futures_timer::Delay::new( + Duration::from_millis(400) // allow for too-good resolution + ).wait().unwrap(); assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); } diff --git a/src/client/tests.rs b/src/client/tests.rs index ad2a16d779..2030c3e8d7 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,8 +1,11 @@ extern crate pretty_env_logger; +use std::thread; +use std::time::Duration; + use futures::Async; use futures::future::poll_fn; -use tokio::reactor::Core; +use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder}; use mock::MockConnector; use super::*; @@ -10,8 +13,8 @@ use super::*; #[test] fn retryable_request() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); + let executor = ThreadPoolBuilder::new().pool_size(1).build(); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); @@ -19,8 +22,7 @@ fn retryable_request() { let client = Client::configure() .connector(connector) - .build(&core.handle()); - + .executor(executor.sender().clone()); { @@ -34,7 +36,7 @@ fn retryable_request() { try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }); - core.run(res1.join(srv1)).expect("res1"); + res1.join(srv1).wait().expect("res1"); } drop(sock1); @@ -52,22 +54,21 @@ fn retryable_request() { Ok(Async::Ready(())) }); - core.run(res2.join(srv2)).expect("res2"); + res2.join(srv2).wait().expect("res2"); } #[test] fn conn_reset_after_write() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); + let executor = ThreadPoolBuilder::new().pool_size(1).build(); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); let client = Client::configure() .connector(connector) - .build(&core.handle()); - + .executor(executor.sender().clone()); { let req = Request::builder() @@ -82,9 +83,12 @@ fn conn_reset_after_write() { try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }); - core.run(res1.join(srv1)).expect("res1"); + res1.join(srv1).wait().expect("res1"); } + // sleep to allow some time for the connection to return to the pool + thread::sleep(Duration::from_secs(1)); + let req = Request::builder() .uri("http://mock.local/a") .body(Default::default()) @@ -102,7 +106,7 @@ fn conn_reset_after_write() { sock1.take(); Ok(Async::Ready(())) }); - let err = core.run(res2.join(srv2)).expect_err("res2"); + let err = res2.join(srv2).wait().expect_err("res2"); match err { ::Error::Incomplete => (), other => panic!("expected Incomplete, found {:?}", other) diff --git a/src/lib.rs b/src/lib.rs index 66a0fa9a72..f39525024e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,13 +19,16 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate futures_cpupool; +extern crate futures_timer; extern crate http; extern crate httparse; extern crate iovec; #[macro_use] extern crate log; +extern crate net2; extern crate relay; extern crate time; -extern crate tokio_core as tokio; +extern crate tokio; +extern crate tokio_executor; #[macro_use] extern crate tokio_io; extern crate tokio_service; extern crate unicase; diff --git a/src/mock.rs b/src/mock.rs index 4bf49f2835..b4bdef187b 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -1,8 +1,7 @@ -use std::cell::RefCell; use std::collections::HashMap; use std::cmp; use std::io::{self, Read, Write}; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; use bytes::Buf; use futures::{Async, Poll}; @@ -284,7 +283,7 @@ impl ::std::ops::Deref for AsyncIo { } pub struct Duplex { - inner: Rc>, + inner: Arc>, } struct DuplexInner { @@ -295,21 +294,22 @@ struct DuplexInner { impl Read for Duplex { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.borrow_mut().read.read(buf) + self.inner.lock().unwrap().read.read(buf) } } impl Write for Duplex { fn write(&mut self, buf: &[u8]) -> io::Result { - if let Some(task) = self.inner.borrow_mut().handle_read_task.take() { + let mut inner = self.inner.lock().unwrap(); + if let Some(task) = inner.handle_read_task.take() { trace!("waking DuplexHandle read"); task.notify(); } - self.inner.borrow_mut().write.write(buf) + inner.write.write(buf) } fn flush(&mut self) -> io::Result<()> { - self.inner.borrow_mut().write.flush() + self.inner.lock().unwrap().write.flush() } } @@ -323,20 +323,21 @@ impl AsyncWrite for Duplex { } fn write_buf(&mut self, buf: &mut B) -> Poll { - if let Some(task) = self.inner.borrow_mut().handle_read_task.take() { + let mut inner = self.inner.lock().unwrap(); + if let Some(task) = inner.handle_read_task.take() { task.notify(); } - self.inner.borrow_mut().write.write_buf(buf) + inner.write.write_buf(buf) } } pub struct DuplexHandle { - inner: Rc>, + inner: Arc>, } impl DuplexHandle { pub fn read(&self, buf: &mut [u8]) -> Poll { - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock().unwrap(); assert!(buf.len() >= inner.write.inner.len()); if inner.write.inner.is_empty() { trace!("DuplexHandle read parking"); @@ -348,7 +349,7 @@ impl DuplexHandle { } pub fn write(&self, bytes: &[u8]) -> Poll { - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock().unwrap(); assert!(inner.read.inner.vec.is_empty()); assert_eq!(inner.read.inner.pos, 0); inner @@ -364,20 +365,20 @@ impl DuplexHandle { impl Drop for DuplexHandle { fn drop(&mut self) { trace!("mock duplex handle drop"); - let mut inner = self.inner.borrow_mut(); + let mut inner = self.inner.lock().unwrap(); inner.read.close(); inner.write.close(); } } pub struct MockConnector { - mocks: RefCell>>, + mocks: Mutex>>, } impl MockConnector { pub fn new() -> MockConnector { MockConnector { - mocks: RefCell::new(HashMap::new()), + mocks: Mutex::new(HashMap::new()), } } @@ -392,7 +393,7 @@ impl MockConnector { inner.read.park_tasks(true); inner.write.park_tasks(true); - let inner = Rc::new(RefCell::new(inner)); + let inner = Arc::new(Mutex::new(inner)); let duplex = Duplex { inner: inner.clone(), @@ -401,7 +402,7 @@ impl MockConnector { inner: inner, }; - self.mocks.borrow_mut().entry(key) + self.mocks.lock().unwrap().entry(key) .or_insert(Vec::new()) .push(duplex); @@ -422,7 +423,7 @@ impl Connect for MockConnector { } else { "".to_owned() }); - let mut mocks = self.mocks.borrow_mut(); + let mut mocks = self.mocks.lock().unwrap(); let mocks = mocks.get_mut(&key) .expect(&format!("unknown mocks uri: {}", key)); assert!(!mocks.is_empty(), "no additional mocks for {}", key); diff --git a/src/server/mod.rs b/src/server/mod.rs index 18b0d0e7cb..30ca9ebb6b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -6,20 +6,21 @@ pub mod conn; mod service; -use std::cell::RefCell; use std::fmt; use std::io; use std::marker::PhantomData; -use std::net::SocketAddr; -use std::rc::{Rc, Weak}; +use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use futures::task::{self, Task}; use futures::future::{self}; use futures::{Future, Stream, Poll, Async}; +use futures_timer::Delay; use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio::reactor::{Core, Handle, Timeout}; +use tokio::spawn; +use tokio::reactor::Handle; use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; @@ -54,7 +55,7 @@ where { protocol: Http, new_service: S, - reactor: Core, + handle: Handle, listener: TcpListener, shutdown_timeout: Duration, } @@ -81,14 +82,25 @@ pub struct SpawnAll { /// A stream of connections from binding to an address. #[must_use = "streams do nothing unless polled"] -#[derive(Debug)] pub struct AddrIncoming { addr: SocketAddr, keep_alive_timeout: Option, listener: TcpListener, handle: Handle, sleep_on_errors: bool, - timeout: Option, + timeout: Option, +} + +impl fmt::Debug for AddrIncoming { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AddrIncoming") + .field("addr", &self.addr) + .field("keep_alive_timeout", &self.keep_alive_timeout) + .field("listener", &self.listener) + .field("handle", &self.handle) + .field("sleep_on_errors", &self.sleep_on_errors) + .finish() + } } // ===== impl Http ===== @@ -156,19 +168,39 @@ impl + 'static> Http { where S: NewService, Response = Response, Error = ::Error> + 'static, Bd: Entity, { - let core = try!(Core::new()); - let handle = core.handle(); - let listener = try!(TcpListener::bind(addr, &handle)); + let handle = Handle::current(); + let std_listener = StdTcpListener::bind(addr)?; + let listener = try!(TcpListener::from_std(std_listener, &handle)); Ok(Server { new_service: new_service, - reactor: core, + handle: handle, listener: listener, protocol: self.clone(), shutdown_timeout: Duration::new(1, 0), }) } + /// Bind the provided `addr` and return a server with the default `Handle`. + /// + /// This is method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided as well, creating a new service per + /// connection. + pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> + where S: NewService, Response = Response, Error = ::Error>, + Bd: Entity, + { + let handle = Handle::current(); + let std_listener = StdTcpListener::bind(addr)?; + let listener = TcpListener::from_std(std_listener, &handle)?; + let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; + if self.keep_alive { + incoming.set_keepalive(Some(Duration::from_secs(90))); + } + Ok(self.serve_incoming(incoming, new_service)) + } + /// Bind the provided `addr` and return a server with a shared `Core`. /// /// This method allows the ability to share a `Core` with multiple servers. @@ -181,7 +213,8 @@ impl + 'static> Http { where S: NewService, Response = Response, Error = ::Error>, Bd: Entity, { - let listener = TcpListener::bind(addr, &handle)?; + let std_listener = StdTcpListener::bind(addr)?; + let listener = TcpListener::from_std(std_listener, &handle)?; let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); @@ -221,17 +254,18 @@ impl + 'static> Http { /// ``` /// # extern crate futures; /// # extern crate hyper; - /// # extern crate tokio_core; + /// # extern crate tokio; /// # extern crate tokio_io; /// # use futures::Future; /// # use hyper::{Body, Request, Response}; /// # use hyper::server::{Http, Service}; /// # use tokio_io::{AsyncRead, AsyncWrite}; - /// # use tokio_core::reactor::Handle; - /// # fn run(some_io: I, some_service: S, some_handle: &Handle) + /// # use tokio::reactor::Handle; + /// # fn run(some_io: I, some_service: S) /// # where - /// # I: AsyncRead + AsyncWrite + 'static, - /// # S: Service, Response=Response, Error=hyper::Error> + 'static, + /// # I: AsyncRead + AsyncWrite + Send + 'static, + /// # S: Service, Response=Response, Error=hyper::Error> + Send + 'static, + /// # S::Future: Send /// # { /// let http = Http::::new(); /// let conn = http.serve_connection(some_io, some_service); @@ -240,7 +274,7 @@ impl + 'static> Http { /// .map(|_| ()) /// .map_err(|e| eprintln!("server connection error: {}", e)); /// - /// some_handle.spawn(fut); + /// tokio::spawn(fut); /// # } /// # fn main() {} /// ``` @@ -286,21 +320,38 @@ impl fmt::Debug for Http { // ===== impl Server ===== + +/// TODO: add docs +pub struct Run(Box + Send + 'static>); + +impl fmt::Debug for Run { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Run").finish() + } +} + +impl Future for Run { + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll<(), ::Error> { + self.0.poll() + } +} + + impl Server - where S: NewService, Response = Response, Error = ::Error> + 'static, - B: Entity + 'static, + where S: NewService, Response = Response, Error = ::Error> + Send + 'static, + ::Instance: Send, + <::Instance as Service>::Future: Send, + B: Entity + Send + 'static, + B::Data: Send, { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> ::Result { Ok(try!(self.listener.local_addr())) } - /// Returns a handle to the underlying event loop that this server will be - /// running on. - pub fn handle(&self) -> Handle { - self.reactor.handle() - } - /// Configure the amount of time this server will wait for a "graceful /// shutdown". /// @@ -318,7 +369,7 @@ impl Server /// /// This method does not currently return, but it will return an error if /// one occurs. - pub fn run(self) -> ::Result<()> { + pub fn run(self) -> Run { self.run_until(future::empty()) } @@ -335,40 +386,42 @@ impl Server /// `shutdown_timeout` time waiting for active connections to shut down. /// Once the `shutdown_timeout` elapses or all active connections are /// cleaned out then this method will return. - pub fn run_until(self, shutdown_signal: F) -> ::Result<()> - where F: Future, + pub fn run_until(self, shutdown_signal: F) -> Run + where F: Future + Send + 'static, { - let Server { protocol, new_service, mut reactor, listener, shutdown_timeout } = self; - - let handle = reactor.handle(); + let Server { protocol, new_service, handle, listener, shutdown_timeout } = self; - let mut incoming = AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors)?; + let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) { + Ok(incoming) => incoming, + Err(err) => return Run(Box::new(future::err(err.into()))), + }; if protocol.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } // Mini future to track the number of active services - let info = Rc::new(RefCell::new(Info { + let info = Arc::new(Mutex::new(Info { active: 0, blocker: None, })); // Future for our server's execution - let srv = incoming.for_each(|socket| { + let info_cloned = info.clone(); + let srv = incoming.for_each(move |socket| { let addr = socket.remote_addr; debug!("accepted new connection ({})", addr); let service = new_service.new_service()?; let s = NotifyService { inner: service, - info: Rc::downgrade(&info), + info: Arc::downgrade(&info_cloned), }; - info.borrow_mut().active += 1; + info_cloned.lock().unwrap().active += 1; let fut = protocol.serve_connection(socket, s) .map(|_| ()) .map_err(move |err| error!("server connection error: ({}) {}", addr, err)); - handle.spawn(fut); + spawn(fut); Ok(()) }); @@ -383,24 +436,30 @@ impl Server // // When we get a shutdown signal (`Ok`) then we drop the TCP listener to // stop accepting incoming connections. - match reactor.run(shutdown_signal.select(srv)) { - Ok(((), _incoming)) => {} - Err((e, _other)) => return Err(e.into()), - } + let main_execution = shutdown_signal.select(srv).then(move |result| { + match result { + Ok(((), _incoming)) => {}, + Err((e, _other)) => return future::Either::A(future::err(e.into())) + } - // Ok we've stopped accepting new connections at this point, but we want - // to give existing connections a chance to clear themselves out. Wait - // at most `shutdown_timeout` time before we just return clearing - // everything out. - // - // Our custom `WaitUntilZero` will resolve once all services constructed - // here have been destroyed. - let timeout = try!(Timeout::new(shutdown_timeout, &handle)); - let wait = WaitUntilZero { info: info.clone() }; - match reactor.run(wait.select(timeout)) { - Ok(_) => Ok(()), - Err((e, _)) => Err(e.into()) - } + // Ok we've stopped accepting new connections at this point, but we want + // to give existing connections a chance to clear themselves out. Wait + // at most `shutdown_timeout` time before we just return clearing + // everything out. + // + // Our custom `WaitUntilZero` will resolve once all services constructed + // here have been destroyed. + let timeout = Delay::new(shutdown_timeout); + let wait = WaitUntilZero { info: info.clone() }; + future::Either::B(wait.select(timeout).then(|result| { + match result { + Ok(_) => Ok(()), + Err((e, _)) => Err(e.into()) + } + })) + }); + + Run(Box::new(main_execution)) } } @@ -537,8 +596,8 @@ impl Stream for AddrIncoming { } self.timeout = None; loop { - match self.listener.accept() { - Ok((socket, addr)) => { + match self.listener.poll_accept() { + Ok(Async::Ready((socket, addr))) => { if let Some(dur) = self.keep_alive_timeout { if let Err(e) = socket.set_keepalive(Some(dur)) { trace!("error trying to set TCP keepalive: {}", e); @@ -546,7 +605,7 @@ impl Stream for AddrIncoming { } return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(ref e) if self.sleep_on_errors => { // Connection errors can be ignored directly, continue by // accepting the next request. @@ -557,8 +616,7 @@ impl Stream for AddrIncoming { let delay = ::std::time::Duration::from_millis(10); debug!("accept error: {}; sleeping {:?}", e, delay); - let mut timeout = Timeout::new(delay, &self.handle) - .expect("can always set a timeout"); + let mut timeout = Delay::new(delay); let result = timeout.poll() .expect("timeout never fails"); match result { @@ -660,11 +718,11 @@ mod addr_stream { struct NotifyService { inner: S, - info: Weak>, + info: Weak>, } struct WaitUntilZero { - info: Rc>, + info: Arc>, } struct Info { @@ -689,7 +747,7 @@ impl Drop for NotifyService { Some(info) => info, None => return, }; - let mut info = info.borrow_mut(); + let mut info = info.lock().unwrap(); info.active -= 1; if info.active == 0 { if let Some(task) = info.blocker.take() { @@ -704,7 +762,7 @@ impl Future for WaitUntilZero { type Error = io::Error; fn poll(&mut self) -> Poll<(), io::Error> { - let mut info = self.info.borrow_mut(); + let mut info = self.info.lock().unwrap(); if info.active == 0 { Ok(().into()) } else { diff --git a/tests/client.rs b/tests/client.rs index baf23379de..33f0f2e334 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2,12 +2,14 @@ extern crate bytes; extern crate hyper; extern crate futures; -extern crate tokio_core; +extern crate futures_timer; +extern crate net2; +extern crate tokio; extern crate tokio_io; extern crate pretty_env_logger; use std::io::{self, Read, Write}; -use std::net::TcpListener; +use std::net::{SocketAddr, TcpListener}; use std::thread; use std::time::Duration; @@ -15,11 +17,16 @@ use hyper::{Body, Client, Method, Request, StatusCode}; use futures::{Future, Stream}; use futures::sync::oneshot; - -use tokio_core::reactor::{Core, Handle}; +use tokio::reactor::Handle; +use tokio::runtime::Runtime; +use tokio::net::{ConnectFuture, TcpStream}; fn s(buf: &[u8]) -> &str { - ::std::str::from_utf8(buf).unwrap() + ::std::str::from_utf8(buf).expect("from_utf8") +} + +fn tcp_connect(addr: &SocketAddr) -> ConnectFuture { + TcpStream::connect(addr) } macro_rules! test { @@ -80,12 +87,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); + let runtime = Runtime::new().expect("runtime new"); let res = test! { INNER; name: $name, - core: &mut core, + runtime: &runtime, server: expected: $server_expected, reply: $server_reply, @@ -96,7 +103,7 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - }.unwrap(); + }.expect("test"); assert_eq!(res.status(), StatusCode::$client_status); @@ -104,7 +111,12 @@ macro_rules! test { assert_eq!(res.headers()[$response_header_name], $response_header_val); )* - let body = core.run(res.into_body().into_stream().concat2()).unwrap(); + let body = res + .into_body() + .into_stream() + .concat2() + .wait() + .expect("body concat wait"); let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); @@ -128,12 +140,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); + let runtime = Runtime::new().expect("runtime new"); let err = test! { INNER; name: $name, - core: &mut core, + runtime: &runtime, server: expected: $server_expected, reply: $server_reply, @@ -146,7 +158,7 @@ macro_rules! test { body: $request_body, }.unwrap_err(); if !$err(&err) { - panic!("unexpected error: {:?}", err) + panic!("expected error, unexpected variant: {:?}", err) } } ); @@ -154,7 +166,7 @@ macro_rules! test { ( INNER; name: $name:ident, - core: $core:expr, + runtime: $runtime:expr, server: expected: $server_expected:expr, reply: $server_reply:expr, @@ -166,15 +178,15 @@ macro_rules! test { headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, ) => ({ - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let core = $core; + let server = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = server.local_addr().expect("local_addr"); + let runtime = $runtime; let mut config = Client::configure(); if !$set_host { config = config.set_host(false); } - let client = config.build(&core.handle()); + let client = config.build_with_executor(&runtime.handle(), runtime.executor()); let body = if let Some(body) = $request_body { let body: &'static str = body; @@ -189,7 +201,7 @@ macro_rules! test { .header($request_header_name, $request_header_val) )* .body(body) - .unwrap(); + .expect("request builder"); let res = client.request(req); @@ -198,9 +210,9 @@ macro_rules! test { let thread = thread::Builder::new() .name(format!("tcp-server<{}>", stringify!($name))); thread.spawn(move || { - let mut inc = server.accept().unwrap().0; - inc.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - inc.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut inc = server.accept().expect("accept").0; + inc.set_read_timeout(Some(Duration::from_secs(5))).expect("set_read_timeout"); + inc.set_write_timeout(Some(Duration::from_secs(5))).expect("set_write_timeout"); let expected = format!($server_expected, addr=addr); let mut buf = [0; 4096]; let mut n = 0; @@ -212,15 +224,13 @@ macro_rules! test { } assert_eq!(s(&buf[..n]), expected); - inc.write_all($server_reply.as_ref()).unwrap(); + inc.write_all($server_reply.as_ref()).expect("write_all"); let _ = tx.send(()); - }).unwrap(); + }).expect("thread spawn"); let rx = rx.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let work = res.join(rx).map(|r| r.0); - - core.run(work) + res.join(rx).map(|r| r.0).wait() }); } @@ -634,8 +644,9 @@ mod dispatch_impl { use futures::{self, Future}; use futures::sync::{mpsc, oneshot}; - use tokio_core::reactor::{Timeout}; - use tokio_core::net::TcpStream; + use futures_timer::Delay; + use tokio::net::TcpStream; + use tokio::runtime::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; @@ -651,12 +662,11 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let client = Client::configure() - .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &core.handle()), closes_tx)) - .build(&handle); + .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &runtime.handle()), closes_tx)) + .executor(runtime.executor()); let (tx1, rx1) = oneshot::channel(); @@ -678,13 +688,13 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - Timeout::new(Duration::from_secs(1), &handle).unwrap() + Delay::new(Duration::from_secs(1)) .from_err() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - core.run(closes.into_future()).unwrap().0.expect("closes"); + closes.into_future().wait().unwrap().0.expect("closes"); } #[test] @@ -694,8 +704,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -715,7 +725,7 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -725,15 +735,15 @@ mod dispatch_impl { assert_eq!(res.status(), hyper::StatusCode::OK); res.into_body().into_stream().concat2() }).and_then(|_| { - Timeout::new(Duration::from_secs(1), &handle).unwrap() + Delay::new(Duration::from_secs(1)) .from_err() }) }; // client is dropped let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - core.run(closes.into_future()).unwrap().0.expect("closes"); + closes.into_future().wait().unwrap().0.expect("closes"); } @@ -743,8 +753,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, mut closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -768,7 +778,7 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -779,7 +789,7 @@ mod dispatch_impl { res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); // not closed yet, just idle { @@ -790,14 +800,14 @@ mod dispatch_impl { } drop(client); - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } @@ -807,8 +817,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -832,7 +842,7 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -842,16 +852,16 @@ mod dispatch_impl { }; //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.select2(rx1)).unwrap(); + res.select2(rx1).wait().unwrap(); // res now dropped - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } #[test] @@ -860,8 +870,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -884,7 +894,7 @@ mod dispatch_impl { let res = { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -895,16 +905,16 @@ mod dispatch_impl { }; let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } #[test] @@ -914,8 +924,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -935,7 +945,7 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) .keep_alive(false) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -946,16 +956,16 @@ mod dispatch_impl { res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } #[test] @@ -965,8 +975,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -983,7 +993,7 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .build(&handle); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -994,23 +1004,23 @@ mod dispatch_impl { res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } #[test] fn conn_drop_prevents_pool_checkout() { // a drop might happen for any sort of reason, and we can protect - // against a lot of them, but if the `Core` is dropped, we can't + // against a lot of them, but if the `runtime` is dropped, we can't // really catch that. So, this is case to always check. // // See https://github.com/hyperium/hyper/issues/1429 @@ -1020,8 +1030,8 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle().clone(); let (tx1, rx1) = oneshot::channel(); @@ -1038,7 +1048,7 @@ mod dispatch_impl { let uri = format!("http://{}/a", addr).parse::().unwrap(); - let client = Client::new(&handle); + let client = Client::configure().build_with_executor(&handle, runtime.executor()); let req = Request::builder() .uri(uri.clone()) @@ -1049,11 +1059,11 @@ mod dispatch_impl { res.into_body().into_stream().concat2() }); - core.run(res).unwrap(); + res.wait().unwrap(); - // drop previous Core - core = Core::new().unwrap(); - let timeout = Timeout::new(Duration::from_millis(200), &core.handle()).unwrap(); + // drop previous runtime + drop(runtime); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); @@ -1064,7 +1074,7 @@ mod dispatch_impl { let res = client.request(req); // this does trigger an 'event loop gone' error, but before, it would // panic internally on a `SendError`, which is what we're testing against. - let err = core.run(res.join(rx).map(|r| r.0)).unwrap_err(); + let err = res.join(rx).map(|r| r.0).wait().unwrap_err(); assert_eq!(err.description(), "event loop gone"); } @@ -1072,8 +1082,8 @@ mod dispatch_impl { fn client_custom_executor() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -1090,7 +1100,7 @@ mod dispatch_impl { let client = Client::configure() .connector(DebugConnector::with_http_and_closes(HttpConnector::new(1, &handle), closes_tx)) - .executor(handle.clone()); + .executor(runtime.executor()); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1102,19 +1112,19 @@ mod dispatch_impl { }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - let t = Timeout::new(Duration::from_millis(100), &handle).unwrap() + let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); let close = closes.into_future() .map(|(opt, _)| { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = core.run(t.select(close)); + let _ = t.select(close).wait(); } #[test] @@ -1123,14 +1133,14 @@ mod dispatch_impl { // idle connections that the Checkout would have found let _ = pretty_env_logger::try_init(); - let core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let connector = DebugConnector::new(&handle); let connects = connector.connects.clone(); let client = Client::configure() .connector(connector) - .build(&handle); + .executor(runtime.executor()); assert_eq!(connects.load(Ordering::Relaxed), 0); let req = Request::builder() @@ -1148,14 +1158,14 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let connector = DebugConnector::new(&handle); let connects = connector.connects.clone(); let client = Client::configure() .connector(connector) - .build(&handle); + .executor(runtime.executor()); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); @@ -1178,7 +1188,7 @@ mod dispatch_impl { }); - assert_eq!(connects.load(Ordering::Relaxed), 0); + assert_eq!(connects.load(Ordering::SeqCst), 0); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() @@ -1186,9 +1196,13 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - assert_eq!(connects.load(Ordering::Relaxed), 1); + assert_eq!(connects.load(Ordering::SeqCst), 1); + + // sleep real quick to let the threadpool put connection in ready + // state and back into client pool + thread::sleep(Duration::from_millis(50)); let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() @@ -1196,9 +1210,9 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); - assert_eq!(connects.load(Ordering::Relaxed), 1, "second request should still only have 1 connect"); + assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); } #[test] @@ -1206,15 +1220,15 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let connector = DebugConnector::new(&handle); let connects = connector.connects.clone(); let client = Client::configure() .connector(connector) - .build(&handle); + .executor(runtime.executor()); let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); @@ -1248,7 +1262,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 1); @@ -1258,7 +1272,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 2); } @@ -1268,14 +1282,14 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); let connector = DebugConnector::new(&handle) .proxy(); let client = Client::configure() .connector(connector) - .build(&handle); + .executor(runtime.executor()); let (tx1, rx1) = oneshot::channel(); thread::spawn(move || { @@ -1299,7 +1313,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); } @@ -1335,7 +1349,7 @@ mod dispatch_impl { impl Connect for DebugConnector { type Transport = DebugStream; type Error = io::Error; - type Future = Box>; + type Future = Box + Send>; fn connect(&self, dst: Destination) -> Self::Future { self.connects.fetch_add(1, Ordering::SeqCst); @@ -1393,21 +1407,21 @@ mod conn { use futures::{Async, Future, Poll, Stream}; use futures::future::poll_fn; use futures::sync::oneshot; - use tokio_core::reactor::{Core, Timeout}; - use tokio_core::net::TcpStream; + use futures_timer::Delay; + use tokio::runtime::Runtime; + use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; use hyper::{self, Request}; use hyper::client::conn; - use super::s; + use super::{s, tcp_connect}; #[test] fn get() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let mut runtime = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1428,11 +1442,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = core.run(TcpStream::connect(&addr, &handle)).unwrap(); + let tcp = tcp_connect(&addr).wait().unwrap(); - let (mut client, conn) = core.run(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); - handle.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("/a") @@ -1444,17 +1458,16 @@ mod conn { }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); } #[test] fn uri_absolute_form() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let mut runtime = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1474,11 +1487,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = core.run(TcpStream::connect(&addr, &handle)).unwrap(); + let tcp = tcp_connect(&addr).wait().unwrap(); - let (mut client, conn) = core.run(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); - handle.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("http://hyper.local/a") @@ -1491,17 +1504,16 @@ mod conn { }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(res.join(rx).map(|r| r.0)).unwrap(); + res.join(rx).map(|r| r.0).wait().unwrap(); } #[test] fn pipeline() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let mut runtime = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1516,11 +1528,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = core.run(TcpStream::connect(&addr, &handle)).unwrap(); + let tcp = tcp_connect(&addr).wait().unwrap(); - let (mut client, conn) = core.run(conn::handshake(tcp)).unwrap(); + let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); - handle.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("/a") @@ -1548,9 +1560,9 @@ mod conn { let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(res1.join(res2).join(rx).map(|r| r.0)).unwrap(); + res1.join(res2).join(rx).map(|r| r.0).wait().unwrap(); } #[test] @@ -1560,8 +1572,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let _runtime = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1584,14 +1595,14 @@ mod conn { sock.write_all(b"bar=foo").expect("write 2"); }); - let tcp = core.run(TcpStream::connect(&addr, &handle)).unwrap(); + let tcp = tcp_connect(&addr).wait().unwrap(); let io = DebugStream { tcp: tcp, shutdown_called: false, }; - let (mut client, mut conn) = core.run(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); { let until_upgrade = poll_fn(|| { @@ -1610,15 +1621,15 @@ mod conn { let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(until_upgrade.join(res).join(rx).map(|r| r.0)).unwrap(); + until_upgrade.join(res).join(rx).map(|r| r.0).wait().unwrap(); // should not be ready now - core.run(poll_fn(|| { + poll_fn(|| { assert!(client.poll_ready().unwrap().is_not_ready()); Ok::<_, ()>(Async::Ready(())) - })).unwrap(); + }).wait().unwrap(); } let parts = conn.into_parts(); @@ -1629,8 +1640,8 @@ mod conn { assert!(!io.shutdown_called, "upgrade shouldn't shutdown AsyncWrite"); assert!(client.poll_ready().is_err()); - let io = core.run(write_all(io, b"foo=bar")).unwrap().0; - let vec = core.run(read_to_end(io, vec![])).unwrap().1; + let io = write_all(io, b"foo=bar").wait().unwrap().0; + let vec = read_to_end(io, vec![]).wait().unwrap().1; assert_eq!(vec, b"bar=foo"); } @@ -1641,8 +1652,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let handle = core.handle(); + let _runtime = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1664,14 +1674,14 @@ mod conn { sock.write_all(b"bar=foo").expect("write 2"); }); - let tcp = core.run(TcpStream::connect(&addr, &handle)).unwrap(); + let tcp = tcp_connect(&addr).wait().unwrap(); let io = DebugStream { tcp: tcp, shutdown_called: false, }; - let (mut client, mut conn) = core.run(conn::handshake(io)).unwrap(); + let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); { let until_tunneled = poll_fn(|| { @@ -1694,15 +1704,15 @@ mod conn { let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap(); + let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); - core.run(until_tunneled.join(res).join(rx).map(|r| r.0)).unwrap(); + until_tunneled.join(res).join(rx).map(|r| r.0).wait().unwrap(); // should not be ready now - core.run(poll_fn(|| { + poll_fn(|| { assert!(client.poll_ready().unwrap().is_not_ready()); Ok::<_, ()>(Async::Ready(())) - })).unwrap(); + }).wait().unwrap(); } let parts = conn.into_parts(); @@ -1713,8 +1723,8 @@ mod conn { assert!(!io.shutdown_called, "tunnel shouldn't shutdown AsyncWrite"); assert!(client.poll_ready().is_err()); - let io = core.run(write_all(io, b"foo=bar")).unwrap().0; - let vec = core.run(read_to_end(io, vec![])).unwrap().1; + let io = write_all(io, b"foo=bar").wait().unwrap().0; + let vec = read_to_end(io, vec![]).wait().unwrap().1; assert_eq!(vec, b"bar=foo"); } diff --git a/tests/server.rs b/tests/server.rs index d5ce55746e..548b346715 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -3,9 +3,11 @@ extern crate http; extern crate hyper; #[macro_use] extern crate futures; +extern crate futures_timer; +extern crate net2; extern crate spmc; extern crate pretty_env_logger; -extern crate tokio_core; +extern crate tokio; extern crate tokio_io; use std::net::{TcpStream, Shutdown, SocketAddr}; @@ -13,21 +15,29 @@ use std::io::{self, Read, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::sync::{Arc, Mutex}; +use std::net::{TcpListener as StdTcpListener}; use std::thread; use std::time::Duration; use futures::{Future, Stream}; use futures::future::{self, FutureResult, Either}; use futures::sync::oneshot; +use futures_timer::Delay; use http::header::{HeaderName, HeaderValue}; -use tokio_core::net::TcpListener; -use tokio_core::reactor::{Core, Timeout}; +//use net2::TcpBuilder; +use tokio::net::TcpListener; +use tokio::runtime::Runtime; +use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use hyper::{Body, Request, Response, StatusCode}; use hyper::server::{Http, Service, NewService, service_fn}; +fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result { + let std_listener = StdTcpListener::bind(addr).unwrap(); + TcpListener::from_std(std_listener, handle) +} #[test] fn get_should_ignore_body() { @@ -67,8 +77,8 @@ fn get_with_body() { #[test] fn get_implicitly_empty() { // See https://github.com/hyperium/hyper/issues/1373 - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -84,11 +94,11 @@ fn get_implicitly_empty() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new().serve_connection(socket, GetImplicitlyEmpty) }); - core.run(fut).unwrap(); + fut.wait().unwrap(); struct GetImplicitlyEmpty; @@ -96,7 +106,7 @@ fn get_implicitly_empty() { type Request = Request; type Response = Response; type Error = hyper::Error; - type Future = Box>; + type Future = Box + Send>; fn call(&self, req: Request) -> Self::Future { Box::new(req.into_body() @@ -744,9 +754,8 @@ fn http_10_request_receives_http_10_response() { #[test] fn disable_keep_alive_mid_request() { - - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -766,7 +775,7 @@ fn disable_keep_alive_mid_request() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new().serve_connection(socket, HelloWorld) .select2(rx1) .then(|r| { @@ -783,15 +792,15 @@ fn disable_keep_alive_mid_request() { }) }); - core.run(fut).unwrap(); + fut.wait().unwrap(); child.join().unwrap(); } #[test] fn disable_keep_alive_post_request() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -827,7 +836,7 @@ fn disable_keep_alive_post_request() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.expect("accepted socket"); + let socket = item.expect("accepted socket"); let transport = DebugStream { stream: socket, _debug: dropped2, @@ -848,21 +857,21 @@ fn disable_keep_alive_post_request() { }); assert!(!dropped.load()); - core.run(fut).unwrap(); + fut.wait().unwrap(); // we must poll the Core one more time in order for Windows to drop // the read-blocked socket. // // See https://github.com/carllerche/mio/issues/776 - let timeout = Timeout::new(Duration::from_millis(10), &core.handle()).unwrap(); - core.run(timeout).unwrap(); + let timeout = Delay::new(Duration::from_millis(10)); + timeout.wait().unwrap(); assert!(dropped.load()); child.join().unwrap(); } #[test] fn empty_parse_eof_does_not_return_error() { - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -873,17 +882,17 @@ fn empty_parse_eof_does_not_return_error() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new().serve_connection(socket, HelloWorld) }); - core.run(fut).unwrap(); + fut.wait().unwrap(); } #[test] fn nonempty_parse_eof_returns_error() { - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -895,18 +904,18 @@ fn nonempty_parse_eof_returns_error() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new().serve_connection(socket, HelloWorld) .map(|_| ()) }); - core.run(fut).unwrap_err(); + fut.wait().unwrap_err(); } #[test] fn returning_1xx_response_is_error() { - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -923,7 +932,7 @@ fn returning_1xx_response_is_error() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new() .serve_connection(socket, service_fn(|_| { Ok(Response::builder() @@ -934,15 +943,15 @@ fn returning_1xx_response_is_error() { .map(|_| ()) }); - core.run(fut).unwrap_err(); + fut.wait().unwrap_err(); } #[test] fn upgrades() { use tokio_io::io::{read_to_end, write_all}; let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -971,7 +980,7 @@ fn upgrades() { .into_future() .map_err(|_| -> hyper::Error { unreachable!() }) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); let conn = Http::::new() .serve_connection(socket, service_fn(|_| { let res = Response::builder() @@ -990,24 +999,24 @@ fn upgrades() { }) }); - let conn = core.run(fut).unwrap(); + let conn = fut.wait().unwrap(); // wait so that we don't write until other side saw 101 response - core.run(rx).unwrap(); + rx.wait().unwrap(); let parts = conn.into_parts(); let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = core.run(write_all(io, b"foo=bar")).unwrap().0; - let vec = core.run(read_to_end(io, vec![])).unwrap().1; + let io = write_all(io, b"foo=bar").wait().unwrap().0; + let vec = read_to_end(io, vec![]).wait().unwrap().1; assert_eq!(vec, b"bar=foo"); } #[test] fn parse_errors_send_4xx_response() { - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1024,19 +1033,19 @@ fn parse_errors_send_4xx_response() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new() .serve_connection(socket, HelloWorld) .map(|_| ()) }); - core.run(fut).unwrap_err(); + fut.wait().unwrap_err(); } #[test] fn illegal_request_length_returns_400_response() { - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1053,20 +1062,20 @@ fn illegal_request_length_returns_400_response() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new() .serve_connection(socket, HelloWorld) .map(|_| ()) }); - core.run(fut).unwrap_err(); + fut.wait().unwrap_err(); } #[test] fn max_buf_size() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); const MAX: usize = 16_000; @@ -1086,21 +1095,21 @@ fn max_buf_size() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new() .max_buf_size(MAX) .serve_connection(socket, HelloWorld) .map(|_| ()) }); - core.run(fut).unwrap_err(); + fut.wait().unwrap_err(); } #[test] fn streaming_body() { let _ = pretty_env_logger::try_init(); - let mut core = Core::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let runtime = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.handle()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1130,7 +1139,7 @@ fn streaming_body() { .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { - let (socket, _) = item.unwrap(); + let socket = item.unwrap(); Http::::new() .keep_alive(false) .serve_connection(socket, service_fn(|_| { @@ -1143,7 +1152,7 @@ fn streaming_body() { .map(|_| ()) }); - core.run(fut.join(rx)).unwrap(); + fut.join(rx).wait().unwrap(); } // ------------------------------------------------- @@ -1272,7 +1281,7 @@ impl Service for TestService { type Request = Request; type Response = Response; type Error = hyper::Error; - type Future = Box, Error=hyper::Error>>; + type Future = Box, Error=hyper::Error> + Send>; fn call(&self, req: Request) -> Self::Future { let tx1 = self.tx.clone(); let tx2 = self.tx.clone(); @@ -1370,16 +1379,19 @@ fn serve_with_options(options: ServeOptions) -> Serve { let thread_name = format!("test-server-{:?}", dur); let thread = thread::Builder::new().name(thread_name).spawn(move || { - let srv = Http::new() - .keep_alive(keep_alive) - .pipeline(pipeline) - .bind(&addr, TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: dur, - reply: reply_rx, - }).unwrap(); - addr_tx.send(srv.local_addr().unwrap()).unwrap(); - srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); + tokio::run(::futures::future::lazy(move || { + let srv = Http::new() + .keep_alive(keep_alive) + .pipeline(pipeline) + .bind(&addr, TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx, + }).unwrap(); + addr_tx.send(srv.local_addr().unwrap()).unwrap(); + srv.run_until(shutdown_rx.then(|_| Ok(()))) + .map_err(|err| println!("error {}", err)) + })) }).unwrap(); let addr = addr_rx.recv().unwrap();