diff --git a/tests/client.rs b/tests/client.rs index 4f39083634..80dc4c7c20 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1478,17 +1478,16 @@ mod dispatch_impl { assert_eq!(vec, b"bar=foo"); } - #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn alpn_h2() { use hyper::Response; use hyper::server::conn::Http; use hyper::service::service_fn; - use tokio_tcp::TcpListener; + use tokio_net::tcp::TcpListener; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let mut listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let mut connector = DebugConnector::new(); connector.alpn_h2 = true; @@ -1497,22 +1496,17 @@ mod dispatch_impl { let client = Client::builder() .build::<_, ::hyper::Body>(connector); - let mut incoming = listener.incoming(); - let srv = incoming - .try_next() - .map_err(|_| unreachable!()) - .and_then(|item| { - let socket = item.unwrap(); - Http::new() - .http2_only(true) - .serve_connection(socket, service_fn(|req| async move { - assert_eq!(req.headers().get("host"), None); - Ok::<_, hyper::Error>(Response::new(Body::empty())) - })) - }) - .map_err(|e| panic!("server error: {}", e)); - - rt.block_on(srv).unwrap(); + rt.spawn(async move { + let (socket, _addr) = listener.accept().await.expect("accept"); + Http::new() + .http2_only(true) + .serve_connection(socket, service_fn(|req| async move { + assert_eq!(req.headers().get("host"), None); + Ok::<_, hyper::Error>(Response::new(Body::empty())) + })) + .await + .expect("server"); + }); assert_eq!(connects.load(Ordering::SeqCst), 0); @@ -2097,7 +2091,6 @@ mod conn { assert_eq!(vec, b"bar=foo"); } - #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_detect_conn_eof() { use futures_util::future; diff --git a/tests/server.rs b/tests/server.rs index 8483d385af..d246cac792 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -14,7 +14,7 @@ use std::net::{TcpStream, Shutdown, SocketAddr}; use std::io::{self, Read, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; -use std::sync::{Arc}; +use std::sync::{Arc, Mutex}; use std::net::{TcpListener as StdTcpListener}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -29,12 +29,10 @@ use futures_util::stream::StreamExt; use futures_util::try_future::{self, TryFutureExt}; use futures_util::try_stream::TryStreamExt; use http::header::{HeaderName, HeaderValue}; +use tokio_net::driver::Handle; use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream}; use tokio::runtime::current_thread::Runtime; -use tokio::reactor::Handle; -use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tcp::{TcpListener, TcpStream as TkTcpStream}; use tokio_timer::Delay; use hyper::{Body, Request, Response, StatusCode, Version}; @@ -306,7 +304,6 @@ mod response_body_lengths { }); } - #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_auto_response_with_known_length() { use hyper::body::Payload; @@ -335,7 +332,6 @@ mod response_body_lengths { }).unwrap(); } - #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_auto_response_with_conflicting_lengths() { use hyper::body::Payload; @@ -1522,7 +1518,6 @@ fn http1_response_with_http2_version() { }).unwrap(); } -#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn try_h2() { let server = serve(); @@ -1545,7 +1540,6 @@ fn try_h2() { assert_eq!(server.body(), b""); } -#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http1_only() { let server = serve_opts() @@ -1564,7 +1558,6 @@ fn http1_only() { }).unwrap_err(); } -#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_service_error_sends_reset_reason() { use std::error::Error; @@ -1596,7 +1589,6 @@ fn http2_service_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } -#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_body_user_error_sends_reset_reason() { use std::error::Error; @@ -1653,7 +1645,6 @@ impl hyper::service::Service for Svc { } } -#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_service_poll_ready_error_sends_goaway() { use std::error::Error; @@ -1742,7 +1733,7 @@ fn skips_content_length_and_body_for_304_responses() { struct Serve { addr: SocketAddr, msg_rx: mpsc::Receiver, - reply_tx: spmc::Sender, + reply_tx: Mutex>, shutdown_signal: Option>, thread: Option>, } @@ -1785,44 +1776,46 @@ impl Serve { type BoxError = Box; struct ReplyBuilder<'a> { - tx: &'a spmc::Sender, + tx: &'a Mutex>, } impl<'a> ReplyBuilder<'a> { fn status(self, status: hyper::StatusCode) -> Self { - self.tx.send(Reply::Status(status)).unwrap(); + self.tx.lock().unwrap().send(Reply::Status(status)).unwrap(); self } fn version(self, version: hyper::Version) -> Self { - self.tx.send(Reply::Version(version)).unwrap(); + self.tx.lock().unwrap().send(Reply::Version(version)).unwrap(); self } fn header>(self, name: &str, value: V) -> Self { let name = HeaderName::from_bytes(name.as_bytes()).expect("header name"); let value = HeaderValue::from_str(value.as_ref()).expect("header value"); - self.tx.send(Reply::Header(name, value)).unwrap(); + self.tx.lock().unwrap().send(Reply::Header(name, value)).unwrap(); self } fn body>(self, body: T) { - self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap(); + self.tx.lock().unwrap().send(Reply::Body(body.as_ref().to_vec().into())).unwrap(); } fn body_stream(self, body: Body) { - self.tx.send(Reply::Body(body)).unwrap(); + self.tx.lock().unwrap().send(Reply::Body(body)).unwrap(); } #[allow(dead_code)] fn error>(self, err: E) { - self.tx.send(Reply::Error(err.into())).unwrap(); + self.tx.lock().unwrap().send(Reply::Error(err.into())).unwrap(); } } impl<'a> Drop for ReplyBuilder<'a> { fn drop(&mut self) { - let _ = self.tx.send(Reply::End); + if let Ok(mut tx) = self.tx.lock() { + let _ = tx.send(Reply::End); + } } } @@ -2036,7 +2029,7 @@ impl ServeOptions { Serve { msg_rx: msg_rx, - reply_tx: reply_tx, + reply_tx: Mutex::new(reply_tx), addr: addr, shutdown_signal: Some(shutdown_tx), thread: Some(thread), diff --git a/tests/support/mod.rs b/tests/support/mod.rs index f5bc0bda75..bfdd19384a 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -1,16 +1,19 @@ pub extern crate hyper; pub extern crate tokio; +extern crate futures_util; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; -use std::time::{Duration, Instant}; +use std::time::{Duration/*, Instant*/}; use crate::hyper::{Body, Client, Request, Response, Server, Version}; use crate::hyper::client::HttpConnector; -use crate::hyper::service::service_fn; +use crate::hyper::service::{make_service_fn, service_fn}; pub use std::net::SocketAddr; -pub use self::futures::{future, Future, Stream}; -pub use self::futures_channel::oneshot; +pub use self::futures_util::{future, try_future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _}; +//pub use self::futures_channel::oneshot; pub use self::hyper::{HeaderMap, StatusCode}; pub use self::tokio::runtime::current_thread::Runtime; @@ -324,10 +327,10 @@ pub fn __run_test(cfg: __TestConfig) { let serve_handles = Arc::new(Mutex::new( cfg.server_msgs )); - let new_service = move || { + let new_service = make_service_fn(move |_| { // Move a clone into the service_fn let serve_handles = serve_handles.clone(); - hyper::service::service_fn(move |req: Request| { + future::ok::<_, hyper::Error>(service_fn(move |req: Request| { let (sreq, sres) = serve_handles.lock() .unwrap() .remove(0); @@ -341,7 +344,7 @@ pub fn __run_test(cfg: __TestConfig) { let sbody = sreq.body; req.into_body() .try_concat() - .map(move |body| { + .map_ok(move |body| { assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); let mut res = Response::builder() @@ -351,8 +354,8 @@ pub fn __run_test(cfg: __TestConfig) { *res.headers_mut() = sres.headers; res }) - }) - }; + })) + }); let serve = hyper::server::conn::Http::new() .http2_only(cfg.server_version == 2) @@ -365,7 +368,7 @@ pub fn __run_test(cfg: __TestConfig) { let mut addr = serve.incoming_ref().local_addr(); let expected_connections = cfg.connections; let server = serve - .fold(0, move |cnt, connecting| { + .try_fold(0, move |cnt, connecting| { let cnt = cnt + 1; assert!( cnt <= expected_connections, @@ -374,14 +377,14 @@ pub fn __run_test(cfg: __TestConfig) { cnt ); let fut = connecting - .map_err(|never| -> hyper::Error { match never {} }) - .flatten() - .map_err(|e| panic!("server connection error: {}", e)); + .then(|res| res.expect("connecting")) + .map(|conn_res| conn_res.expect("server connection error")); crate::tokio::spawn(fut); - Ok::<_, hyper::Error>(cnt) + future::ok::<_, hyper::Error>(cnt) }) - .map(|_| ()) - .map_err(|e| panic!("serve error: {}", e)); + .map(|res| { + let _ = res.expect("serve error"); + }); rt.spawn(server); @@ -418,39 +421,37 @@ pub fn __run_test(cfg: __TestConfig) { } res.into_body().try_concat() }) - .map(move |body| { + .map_ok(move |body| { assert_eq!(body.as_ref(), cbody.as_slice(), "server body"); }) - .map_err(|e| panic!("client error: {}", e)) + .map(|res| res.expect("client error")) }); - let client_futures: Box + Send> = if cfg.parallel { + let client_futures: Pin + Send>> = if cfg.parallel { let mut client_futures = vec![]; for (creq, cres) in cfg.client_msgs { client_futures.push(make_request(&client, creq, cres)); } drop(client); - Box::new(future::join_all(client_futures).map(|_| ())) + Box::pin(future::join_all(client_futures).map(|_| ())) } else { - let mut client_futures: Box, Error=()> + Send> = - Box::new(future::ok(client)); + let mut client_futures: Pin> + Send>> = + Box::pin(future::ready(client)); for (creq, cres) in cfg.client_msgs { let mk_request = make_request.clone(); - client_futures = Box::new( + client_futures = Box::pin( client_futures - .and_then(move |client| { + .then(move |client| { let fut = mk_request(&client, creq, cres); fut.map(move |()| client) }) ); } - Box::new(client_futures.map(|_| ())) + Box::pin(client_futures.map(|_| ())) }; - let client_futures = client_futures.map(|_| ()); - rt.block_on(client_futures) - .expect("shutdown succeeded"); + rt.block_on(client_futures); } struct ProxyConfig { @@ -459,7 +460,7 @@ struct ProxyConfig { version: usize, } -fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { +fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { let client = Client::builder() .keep_alive_timeout(Duration::from_secs(10)) .http2_only(cfg.version == 2) @@ -470,19 +471,18 @@ fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future= prev + 1, "proxy max connections"); let client = client.clone(); - service_fn(move |mut req| { + future::ok::<_, hyper::Error>(service_fn(move |mut req| { let uri = format!("http://{}{}", dst_addr, req.uri().path()) .parse() .expect("proxy new uri parse"); *req.uri_mut() = uri; client.request(req) - }) - - }); + })) + })); let proxy_addr = srv.local_addr(); - (proxy_addr, srv.map_err(|err| panic!("proxy error: {}", err))) + (proxy_addr, srv.map(|res| res.expect("proxy error"))) }