diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 16b91661fb..4f1b921e61 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -38,6 +38,7 @@ where B: Payload, { conn: Connection>, + closing: Option<::Error>, } @@ -46,7 +47,6 @@ where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, - //S::Future: Send + 'static, B: Payload, E: H2Exec, { @@ -66,7 +66,9 @@ where // fall-through, to replace state with Closed }, State::Serving(ref mut srv) => { - srv.conn.graceful_shutdown(); + if srv.closing.is_none() { + srv.conn.graceful_shutdown(); + } return; }, State::Closed => { @@ -82,7 +84,6 @@ where T: AsyncRead + AsyncWrite, S: Service, S::Error: Into>, - //S::Future: Send + 'static, B: Payload, E: H2Exec, { @@ -95,7 +96,8 @@ where State::Handshaking(ref mut h) => { let conn = try_ready!(h.poll().map_err(::Error::new_h2)); State::Serving(Serving { - conn: conn, + conn, + closing: None, }) }, State::Serving(ref mut srv) => { @@ -127,37 +129,57 @@ where S::Error: Into>, E: H2Exec, { - loop { - // At first, polls the readiness of supplied service. - match service.poll_ready() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => { - // use `poll_close` instead of `poll`, in order to avoid accepting a request. - try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); - trace!("incoming connection complete"); - return Ok(Async::Ready(())); - } - Err(err) => { - trace!("service closed"); - return Err(::Error::new_user_service(err)); + if self.closing.is_none() { + loop { + // At first, polls the readiness of supplied service. + match service.poll_ready() { + Ok(Async::Ready(())) => (), + Ok(Async::NotReady) => { + // use `poll_close` instead of `poll`, in order to avoid accepting a request. + try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); + trace!("incoming connection complete"); + return Ok(Async::Ready(())); + } + Err(err) => { + let err = ::Error::new_user_service(err); + debug!("service closed: {}", err); + + let reason = err.h2_reason(); + if reason == h2::Reason::NO_ERROR { + // NO_ERROR is only used for graceful shutdowns... + trace!("interpretting NO_ERROR user error as graceful_shutdown"); + self.conn.graceful_shutdown(); + } else { + trace!("abruptly shutting down with {:?}", reason); + self.conn.abrupt_shutdown(reason); + } + self.closing = Some(err); + break; + } } - } - // When the service is ready, accepts an incoming request. - if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { - trace!("incoming request"); - let content_length = content_length_parse_all(req.headers()); - let req = req.map(|stream| { - ::Body::h2(stream, content_length) - }); - let fut = H2Stream::new(service.call(req), respond); - exec.execute_h2stream(fut)?; - } else { - // no more incoming streams... - trace!("incoming connection complete"); - return Ok(Async::Ready(())) + // When the service is ready, accepts an incoming request. + if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { + trace!("incoming request"); + let content_length = content_length_parse_all(req.headers()); + let req = req.map(|stream| { + ::Body::h2(stream, content_length) + }); + let fut = H2Stream::new(service.call(req), respond); + exec.execute_h2stream(fut)?; + } else { + // no more incoming streams... + trace!("incoming connection complete"); + return Ok(Async::Ready(())) + } } } + + debug_assert!(self.closing.is_some(), "poll_server broke loop without closing"); + + try_ready!(self.conn.poll_close().map_err(::Error::new_h2)); + + Err(self.closing.take().expect("polled after error")) } } diff --git a/src/server/conn.rs b/src/server/conn.rs index 82b3ba7d2f..333a9689d0 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -555,7 +555,6 @@ where match polled { Ok(x) => return Ok(x), Err(e) => { - debug!("error polling connection protocol without shutdown: {}", e); match *e.kind() { Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { self.upgrade_h2(); @@ -644,7 +643,6 @@ where } })), Err(e) => { - debug!("error polling connection protocol: {}", e); match *e.kind() { Kind::Parse(Parse::VersionH2) if self.fallback.to_h2() => { self.upgrade_h2(); @@ -962,7 +960,6 @@ mod upgrades { return Ok(Async::Ready(())); }, Err(e) => { - debug!("error polling connection protocol: {}", e); match *e.kind() { Kind::Parse(Parse::VersionH2) if self.inner.fallback.to_h2() => { self.inner.upgrade_h2(); diff --git a/tests/server.rs b/tests/server.rs index ea8b79b5b6..8baee02196 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1630,6 +1630,63 @@ fn http2_body_user_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } +#[test] +fn http2_service_poll_ready_error_sends_goaway() { + use std::error::Error; + + struct Svc; + + impl hyper::service::Service for Svc { + type ReqBody = hyper::Body; + type ResBody = hyper::Body; + type Error = h2::Error; + type Future = Box, + Error = Self::Error + > + Send + Sync>; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + Err(h2::Error::from(h2::Reason::INADEQUATE_SECURITY)) + } + + fn call(&mut self, _: hyper::Request) -> Self::Future { + unreachable!("poll_ready error should have shutdown conn"); + } + } + + let _ = pretty_env_logger::try_init(); + + let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into()) + .http2_only(true) + .serve(|| Ok::<_, BoxError>(Svc)); + + let addr_str = format!("http://{}", server.local_addr()); + + + let mut rt = Runtime::new().expect("runtime new"); + + rt.spawn(server.map_err(|e| unreachable!("server shouldn't error: {:?}", e))); + + let err = rt.block_on(hyper::rt::lazy(move || { + let client = Client::builder() + .http2_only(true) + .build_http::(); + let uri = addr_str.parse().expect("server addr should parse"); + + client + .get(uri) + })).unwrap_err(); + + // client request should have gotten the specific GOAWAY error... + let h2_err = err + .source() + .expect("source") + .downcast_ref::() + .expect("downcast"); + + assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); +} + // ------------------------------------------------- // the Server that is used to run all the tests with // -------------------------------------------------