diff --git a/tests/client.rs b/tests/client.rs index 1f1a456f95..d5a0e4e005 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1498,7 +1498,7 @@ mod conn { use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; use http_body_util::{BodyExt, Empty, Full, StreamBody}; use hyper::rt::Timer; - use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; + use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream}; use tokio::net::{TcpListener as TkTcpListener, TcpStream}; use hyper::body::{Body, Frame}; @@ -1530,6 +1530,21 @@ mod conn { (listener, addr) } + fn setup_duplex_test_server() -> (DuplexStream, DuplexStream, SocketAddr) { + use std::net::{IpAddr, Ipv6Addr}; + setup_logger(); + + const BUF_SIZE: usize = 1024; + let (ioa, iob) = tokio::io::duplex(BUF_SIZE); + + /// A test address inside the 'documentation' address range. + /// See: + const TEST_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0x3fff, 0, 0, 0, 0, 0, 0, 1)); + const TEST_SOCKET: SocketAddr = SocketAddr::new(TEST_ADDR, 8080); + + (ioa, iob, TEST_SOCKET) + } + #[tokio::test] async fn get() { let (listener, addr) = setup_tk_test_server().await; @@ -2307,16 +2322,14 @@ mod conn { // Regression test for failure to fully close connections when using HTTP2 CONNECT // We send 2 requests and then drop them. We should see the connection gracefully close. use futures_util::future; - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, addr) = setup_duplex_test_server(); let (tx, rxx) = oneshot::channel::<()>(); tokio::task::spawn(async move { use hyper::server::conn::http2; use hyper::service::service_fn; - let res = listener.accept().await; - let (stream, _) = res.unwrap(); - let stream = TokioIo::new(stream); + let stream = TokioIo::new(server_io); let service = service_fn(move |req: Request| { tokio::task::spawn(async move { @@ -2334,7 +2347,7 @@ mod conn { }); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake(io) .await @@ -2380,11 +2393,11 @@ mod conn { #[tokio::test] async fn http2_keep_alive_detects_unresponsive_server() { - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _) = setup_duplex_test_server(); // spawn a server that reads but doesn't write tokio::spawn(async move { - let mut sock = listener.accept().await.unwrap().0; + let mut sock = server_io; let mut buf = [0u8; 1024]; loop { let n = sock.read(&mut buf).await.expect("server read"); @@ -2395,7 +2408,7 @@ mod conn { } }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (_client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) @@ -2415,15 +2428,14 @@ mod conn { // will use the default behavior which will NOT detect the server // is unresponsive while no streams are active. - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _) = setup_duplex_test_server(); // spawn a server that reads but doesn't write tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; - drain_til_eof(sock).await.expect("server read"); + drain_til_eof(server_io).await.expect("server read"); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) @@ -2446,15 +2458,14 @@ mod conn { #[tokio::test] async fn http2_keep_alive_closes_open_streams() { - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _addr) = setup_duplex_test_server(); // spawn a server that reads but doesn't write tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; - drain_til_eof(sock).await.expect("server read"); + drain_til_eof(server_io).await.expect("server read"); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) @@ -2491,11 +2502,11 @@ mod conn { // alive is enabled use hyper::service::service_fn; - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _addr) = setup_duplex_test_server(); // Spawn an HTTP2 server that reads the whole body and responds tokio::spawn(async move { - let sock = TokioIo::new(listener.accept().await.unwrap().0); + let sock = TokioIo::new(server_io); hyper::server::conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .serve_connection( @@ -2513,7 +2524,7 @@ mod conn { .expect("serve_connection"); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .timer(TokioTimer) .keep_alive_interval(Duration::from_secs(1)) @@ -2598,11 +2609,11 @@ mod conn { #[tokio::test] async fn h2_connect() { - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _) = setup_duplex_test_server(); // Spawn an HTTP2 server that asks for bread and responds with baguette. tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; + let sock = server_io; let mut h2 = h2::server::handshake(sock).await.unwrap(); let (req, mut respond) = h2.accept().await.unwrap().unwrap(); @@ -2624,7 +2635,7 @@ mod conn { assert!(body.data().await.is_none()); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake(io) .await @@ -2653,11 +2664,11 @@ mod conn { #[tokio::test] async fn h2_connect_rejected() { - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _) = setup_duplex_test_server(); let (done_tx, done_rx) = oneshot::channel(); tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; + let sock = server_io; let mut h2 = h2::server::handshake(sock).await.unwrap(); let (req, mut respond) = h2.accept().await.unwrap().unwrap(); @@ -2674,7 +2685,7 @@ mod conn { done_rx.await.unwrap(); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http2::Builder::new(TokioExecutor) .handshake::<_, Empty>(io) .await @@ -2703,16 +2714,15 @@ mod conn { #[tokio::test] async fn test_body_panics() { - let (listener, addr) = setup_tk_test_server().await; + let (client_io, server_io, _) = setup_duplex_test_server(); // spawn a server that reads but doesn't write tokio::spawn(async move { - let sock = listener.accept().await.unwrap().0; + let sock = server_io; drain_til_eof(sock).await.expect("server read"); }); - let io = tcp_connect(&addr).await.expect("tcp connect"); - + let io = TokioIo::new(client_io); let (mut client, conn) = conn::http1::Builder::new() .handshake(io) .await