Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ mod conn {
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::rt::Timer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};

use hyper::body::{Body, Frame};
Expand Down Expand Up @@ -1530,6 +1530,21 @@ mod conn {
(listener, addr)
}

fn setup_duplex_test_server() -> (DuplexStream, DuplexStream, SocketAddr) {
use std::net::{IpAddr, Ipv6Addr};
setup_logger();

const BUF_SIZE: usize = 1024;
let (ioa, iob) = tokio::io::duplex(BUF_SIZE);

/// A test address inside the 'documentation' address range.
/// See: <https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml>
const TEST_ADDR: IpAddr = IpAddr::V6(Ipv6Addr::new(0x3fff, 0, 0, 0, 0, 0, 0, 1));
const TEST_SOCKET: SocketAddr = SocketAddr::new(TEST_ADDR, 8080);

(ioa, iob, TEST_SOCKET)
}

#[tokio::test]
async fn get() {
let (listener, addr) = setup_tk_test_server().await;
Expand Down Expand Up @@ -2307,16 +2322,14 @@ mod conn {
// Regression test for failure to fully close connections when using HTTP2 CONNECT
// We send 2 requests and then drop them. We should see the connection gracefully close.
use futures_util::future;
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, addr) = setup_duplex_test_server();
let (tx, rxx) = oneshot::channel::<()>();

tokio::task::spawn(async move {
use hyper::server::conn::http2;
use hyper::service::service_fn;

let res = listener.accept().await;
let (stream, _) = res.unwrap();
let stream = TokioIo::new(stream);
let stream = TokioIo::new(server_io);

let service = service_fn(move |req: Request<hyper::body::Incoming>| {
tokio::task::spawn(async move {
Expand All @@ -2334,7 +2347,7 @@ mod conn {
});
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
Expand Down Expand Up @@ -2380,11 +2393,11 @@ mod conn {

#[tokio::test]
async fn http2_keep_alive_detects_unresponsive_server() {
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _) = setup_duplex_test_server();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let mut sock = listener.accept().await.unwrap().0;
let mut sock = server_io;
let mut buf = [0u8; 1024];
loop {
let n = sock.read(&mut buf).await.expect("server read");
Expand All @@ -2395,7 +2408,7 @@ mod conn {
}
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (_client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
Expand All @@ -2415,15 +2428,14 @@ mod conn {
// will use the default behavior which will NOT detect the server
// is unresponsive while no streams are active.

let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _) = setup_duplex_test_server();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
drain_til_eof(sock).await.expect("server read");
drain_til_eof(server_io).await.expect("server read");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
Expand All @@ -2446,15 +2458,14 @@ mod conn {

#[tokio::test]
async fn http2_keep_alive_closes_open_streams() {
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _addr) = setup_duplex_test_server();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
drain_til_eof(sock).await.expect("server read");
drain_til_eof(server_io).await.expect("server read");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
Expand Down Expand Up @@ -2491,11 +2502,11 @@ mod conn {
// alive is enabled
use hyper::service::service_fn;

let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _addr) = setup_duplex_test_server();

// Spawn an HTTP2 server that reads the whole body and responds
tokio::spawn(async move {
let sock = TokioIo::new(listener.accept().await.unwrap().0);
let sock = TokioIo::new(server_io);
hyper::server::conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.serve_connection(
Expand All @@ -2513,7 +2524,7 @@ mod conn {
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.keep_alive_interval(Duration::from_secs(1))
Expand Down Expand Up @@ -2598,11 +2609,11 @@ mod conn {

#[tokio::test]
async fn h2_connect() {
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _) = setup_duplex_test_server();

// Spawn an HTTP2 server that asks for bread and responds with baguette.
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
let sock = server_io;
let mut h2 = h2::server::handshake(sock).await.unwrap();

let (req, mut respond) = h2.accept().await.unwrap().unwrap();
Expand All @@ -2624,7 +2635,7 @@ mod conn {
assert!(body.data().await.is_none());
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake(io)
.await
Expand Down Expand Up @@ -2653,11 +2664,11 @@ mod conn {

#[tokio::test]
async fn h2_connect_rejected() {
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _) = setup_duplex_test_server();
let (done_tx, done_rx) = oneshot::channel();

tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
let sock = server_io;
let mut h2 = h2::server::handshake(sock).await.unwrap();

let (req, mut respond) = h2.accept().await.unwrap().unwrap();
Expand All @@ -2674,7 +2685,7 @@ mod conn {
done_rx.await.unwrap();
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.handshake::<_, Empty<Bytes>>(io)
.await
Expand Down Expand Up @@ -2703,16 +2714,15 @@ mod conn {

#[tokio::test]
async fn test_body_panics() {
let (listener, addr) = setup_tk_test_server().await;
let (client_io, server_io, _) = setup_duplex_test_server();

// spawn a server that reads but doesn't write
tokio::spawn(async move {
let sock = listener.accept().await.unwrap().0;
let sock = server_io;
drain_til_eof(sock).await.expect("server read");
});

let io = tcp_connect(&addr).await.expect("tcp connect");

let io = TokioIo::new(client_io);
let (mut client, conn) = conn::http1::Builder::new()
.handshake(io)
.await
Expand Down
Loading