Skip to content

Commit

Permalink
fix(client): early server response shouldn't propagate NO_ERROR (#3275)
Browse files Browse the repository at this point in the history
Closes #2872
  • Loading branch information
DDtKey authored Jul 26, 2023
1 parent 53b8372 commit 194e6f9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
11 changes: 10 additions & 1 deletion src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ impl Body for Incoming {
ping.record_data(bytes.len());
return Poll::Ready(Some(Ok(Frame::data(bytes))));
}
Some(Err(e)) => return Poll::Ready(Some(Err(crate::Error::new_body(e)))),
Some(Err(e)) => {
return match e.reason() {
// These reasons should cause the body reading to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
Poll::Ready(None)
}
_ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
};
}
None => {
*data_done = true;
// fall through to trailers
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) enum BodyLength {
Unknown,
}

/// Status of when a Disaptcher future completes.
/// Status of when a Dispatcher future completes.
pub(crate) enum Dispatched {
/// Dispatcher completely shutdown connection.
Shutdown,
Expand Down
58 changes: 57 additions & 1 deletion tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ mod conn {
use bytes::{Buf, Bytes};
use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
use http_body_util::{BodyExt, Empty, StreamBody};
use http_body_util::{BodyExt, Empty, Full, StreamBody};
use hyper::rt::Timer;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::net::{TcpListener as TkTcpListener, TcpStream};
Expand Down Expand Up @@ -2126,6 +2126,62 @@ mod conn {
.expect("client should be open");
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

let (listener, addr) = setup_tk_test_server().await;

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = TokioIo::new(listener.accept().await.unwrap().0);
hyper::server::conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.serve_connection(
sock,
service_fn(|_req| async move {
Ok::<_, hyper::Error>(Response::new(Full::new(Bytes::from(
"No bread for you!",
))))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.handshake(io)
.await
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (_tx, recv) = mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(0);
let req = Request::post("/a").body(StreamBody::new(recv)).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = String::new();
concat(resp.into_body())
.await
.unwrap()
.reader()
.read_to_string(&mut body)
.unwrap();

assert_eq!(&body, "No bread for you!");
}

#[tokio::test]
async fn h2_connect() {
let (listener, addr) = setup_tk_test_server().await;
Expand Down

0 comments on commit 194e6f9

Please sign in to comment.