Skip to content

Commit

Permalink
fix(http1): http1 server graceful shutdown fix (#3261)
Browse files Browse the repository at this point in the history
fix issue in the graceful shutdown logic
which causes the connection future to hang
when graceful shutdown is called prior to any
requests being  made. This fix checks to see
if the connection is still in its initial state
when disable_keep_alive is called, and starts the
shutdown process if it is.

This addresses issue #2730

Co-authored-by: Robin Seitz <[email protected]>
  • Loading branch information
seanmonstar and Robin Seitz authored Jul 6, 2023
1 parent d92d391 commit f4b5130
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
7 changes: 7 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,13 @@ where
}
}

#[cfg(feature = "server")]
pub(crate) fn has_initial_read_write_state(&self) -> bool {
matches!(self.state.reading, Reading::Init)
&& matches!(self.state.writing, Writing::Init)
&& self.io.read_buf().is_empty()
}

fn should_error_on_eof(&self) -> bool {
// If we're idle, it's probably just the connection closing gracefully.
T::should_error_on_parse_eof() && !self.state.is_idle()
Expand Down
6 changes: 5 additions & 1 deletion src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ where
#[cfg(feature = "server")]
pub(crate) fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive();
if self.conn.is_write_closed() {

// If keep alive has been disabled and no read or write has been seen on
// the connection yet, we must be in a state where the server is being asked to
// shut down before any data has been seen on the connection
if self.conn.is_write_closed() || self.conn.has_initial_read_write_state() {
self.close();
}
}
Expand Down
32 changes: 32 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use hyper::body::{Body, Incoming as IncomingBody};
use hyper::server::conn::{http1, http2};
use hyper::service::{service_fn, Service};
use hyper::{Method, Request, Response, StatusCode, Uri, Version};
use tokio::pin;

mod support;

Expand Down Expand Up @@ -1139,11 +1140,17 @@ async fn disable_keep_alive_mid_request() {
let child = thread::spawn(move || {
let mut req = connect(&addr);
req.write_all(b"GET / HTTP/1.1\r\n").unwrap();
thread::sleep(Duration::from_millis(10));
tx1.send(()).unwrap();
rx2.recv().unwrap();
req.write_all(b"Host: localhost\r\n\r\n").unwrap();
let mut buf = vec![];
req.read_to_end(&mut buf).unwrap();
assert!(
buf.starts_with(b"HTTP/1.1 200 OK\r\n"),
"should receive OK response, but buf: {:?}",
buf,
);
});

let (socket, _) = listener.accept().await.unwrap();
Expand Down Expand Up @@ -2152,6 +2159,31 @@ async fn max_buf_size() {
.expect_err("should TooLarge error");
}

#[cfg(feature = "http1")]
#[tokio::test]
async fn graceful_shutdown_before_first_request_no_block() {
let (listener, addr) = setup_tcp_listener();

tokio::spawn(async move {
let socket = listener.accept().await.unwrap().0;

let future = http1::Builder::new().serve_connection(socket, HelloWorld);
pin!(future);
future.as_mut().graceful_shutdown();

future.await.unwrap();
});

let mut stream = TkTcpStream::connect(addr).await.unwrap();

let mut buf = vec![];

tokio::time::timeout(Duration::from_secs(5), stream.read_to_end(&mut buf))
.await
.expect("timed out waiting for graceful shutdown")
.expect("error receiving response");
}

#[test]
fn streaming_body() {
use futures_util::StreamExt;
Expand Down

0 comments on commit f4b5130

Please sign in to comment.