diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index efcd2af4a4..bb5cdcb666 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,7 +1,7 @@ use bytes::IntoBuf; use futures::{Async, Future, Poll, Stream}; use futures::future::{self, Either}; -use futures::sync::mpsc; +use futures::sync::{mpsc, oneshot}; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -18,6 +18,10 @@ type ClientRx = ::client::dispatch::Receiver, Response>; /// other handles to it have been dropped, so that it can shutdown. type ConnDropRef = mpsc::Sender; +/// A oneshot channel watches the `Connection` task, and when it completes, +/// the "dispatch" task will be notified and can shutdown sooner. +type ConnEof = oneshot::Receiver; + pub(crate) struct Client where B: Payload, @@ -29,7 +33,7 @@ where enum State where B: IntoBuf { Handshaking(Handshake), - Ready(SendRequest, ConnDropRef), + Ready(SendRequest, ConnDropRef, ConnEof), } impl Client @@ -66,6 +70,7 @@ where // in h2 where dropping all SendRequests won't notify a // parked Connection. let (tx, rx) = mpsc::channel(0); + let (cancel_tx, cancel_rx) = oneshot::channel(); let rx = rx.into_future() .map(|(msg, _)| match msg { Some(never) => match never {}, @@ -73,7 +78,10 @@ where }) .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); let fut = conn - .inspect(|_| trace!("connection complete")) + .inspect(move |_| { + drop(cancel_tx); + trace!("connection complete") + }) .map_err(|e| debug!("connection error: {}", e)) .select2(rx) .then(|res| match res { @@ -92,10 +100,21 @@ where Err(Either::B((never, _))) => match never {}, }); self.executor.execute(fut)?; - State::Ready(request_tx, tx) + State::Ready(request_tx, tx, cancel_rx) }, - State::Ready(ref mut tx, ref conn_dropper) => { - try_ready!(tx.poll_ready().map_err(::Error::new_h2)); + State::Ready(ref mut tx, ref conn_dropper, ref mut cancel_rx) => { + match tx.poll_ready() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => { + return if err.reason() == Some(::h2::Reason::NO_ERROR) { + trace!("connection gracefully shutdown"); + Ok(Async::Ready(Dispatched::Shutdown)) + } else { + Err(::Error::new_h2(err)) + }; + } + } match self.rx.poll() { Ok(Async::Ready(Some((req, cb)))) => { // check that future hasn't been canceled already @@ -157,7 +176,16 @@ where continue; }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::NotReady) => { + match cancel_rx.poll() { + Ok(Async::Ready(never)) => match never {}, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(_conn_is_eof) => { + trace!("connection task is closed, closing dispatch task"); + return Ok(Async::Ready(Dispatched::Shutdown)); + } + } + }, Ok(Async::Ready(None)) => { trace!("client::dispatch::Sender dropped"); diff --git a/tests/client.rs b/tests/client.rs index bbc094adce..7ca32cc581 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2095,6 +2095,57 @@ mod conn { assert_eq!(vec, b"bar=foo"); } + + #[test] + fn http2_detect_conn_eof() { + use futures::future; + use hyper::{Response, Server}; + use hyper::service::service_fn_ok; + use tokio::timer::Delay; + + let _ = pretty_env_logger::try_init(); + + let mut rt = Runtime::new().unwrap(); + + let server = Server::bind(&([127, 0, 0, 1], 0).into()) + .http2_only(true) + .serve(|| service_fn_ok(|_req| { + Response::new(Body::empty()) + })); + let addr = server.local_addr(); + let (shdn_tx, shdn_rx) = oneshot::channel(); + rt.spawn(server.with_graceful_shutdown(shdn_rx).map_err(|e| panic!("server error: {:?}", e))); + + let io = rt.block_on(tcp_connect(&addr)).expect("tcp connect"); + let (mut client, conn) = rt.block_on( + conn::Builder::new().http2_only(true).handshake::<_, Body>(io) + ).expect("http handshake"); + rt.spawn(conn.map_err(|e| panic!("client conn error: {:?}", e))); + + + // Sanity check that client is ready + rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready sanity"); + + let req = Request::builder() + .uri(format!("http://{}/", addr)) + .body(Body::empty()) + .expect("request builder"); + + rt.block_on(client.send_request(req)).expect("req1 send"); + + // Sanity check that client is STILL ready + rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready after"); + + // Trigger the server shutdown... + let _ = shdn_tx.send(()); + + // Allow time for graceful shutdown roundtrips... + rt.block_on(Delay::new(::std::time::Instant::now() + Duration::from_millis(100)).map_err(|e| panic!("delay error: {:?}", e))).expect("delay"); + + // After graceful shutdown roundtrips, the client should be closed... + rt.block_on(future::poll_fn(|| client.poll_ready())).expect_err("client should be closed"); + } + struct DebugStream { tcp: TcpStream, shutdown_called: bool,