From 4b6099c7aa558e6b1fda146ce6179cb0c67858d7 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 13 Dec 2019 10:19:10 -0800 Subject: [PATCH] feat(body): implement `HttpBody` for `Request` and `Response` When the body type of a `Request` or `Response` implements `HttpBody`, the `Request` or `Response` itself now implements `HttpBody`. This allows writing things like `hyper::body::aggregate(req)` instead of `hyper::body::aggregate(req.into_body())`. Closes #2067 --- Cargo.toml | 2 +- examples/client.rs | 2 +- examples/client_json.rs | 2 +- examples/params.rs | 2 +- examples/web_api.rs | 2 +- src/body/body.rs | 2 +- src/body/to_bytes.rs | 2 +- src/client/mod.rs | 2 +- tests/client.rs | 20 ++++++++++---------- tests/server.rs | 2 +- tests/support/mod.rs | 4 ++-- 11 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 819d6a6516..06eef653ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ futures-core = { version = "0.3", default-features = false } futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" -http-body = "0.3" +http-body = "0.3.1" httparse = "1.0" h2 = "0.2.1" itoa = "0.4.1" diff --git a/examples/client.rs b/examples/client.rs index de52d8a706..2333746993 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -42,7 +42,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> { // Stream the body, writing each chunk to stdout as we get it // (instead of buffering and printing at the end). - while let Some(next) = res.body_mut().data().await { + while let Some(next) = res.data().await { let chunk = next?; io::stdout().write_all(&chunk).await?; } diff --git a/examples/client_json.rs b/examples/client_json.rs index 9027f05e31..63dc4bff88 100644 --- a/examples/client_json.rs +++ b/examples/client_json.rs @@ -30,7 +30,7 @@ async fn fetch_json(url: hyper::Uri) -> Result> { let res = client.get(url).await?; // asynchronously aggregate the chunks of the body - let body = hyper::body::aggregate(res.into_body()).await?; + let body = hyper::body::aggregate(res).await?; // try to parse as json with serde_json let users = serde_json::from_reader(body.reader())?; diff --git a/examples/params.rs b/examples/params.rs index d3f2966e48..5904a41136 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -17,7 +17,7 @@ async fn param_example(req: Request) -> Result, hyper::Erro (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())), (&Method::POST, "/post") => { // Concatenate the body... - let b = hyper::body::to_bytes(req.into_body()).await?; + let b = hyper::body::to_bytes(req).await?; // Parse the request body. form_urlencoded::parse // always succeeds, but in general parsing may // fail (for example, an invalid post of json), so diff --git a/examples/web_api.rs b/examples/web_api.rs index 7f30c65dfe..bc9e13787d 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -40,7 +40,7 @@ async fn client_request_response(client: &Client) -> Result) -> Result> { // Aggregate the body... - let whole_body = hyper::body::aggregate(req.into_body()).await?; + let whole_body = hyper::body::aggregate(req).await?; // Decode as JSON... let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?; // Change the JSON... diff --git a/src/body/body.rs b/src/body/body.rs index f57dd0f618..deb2db67d9 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -18,7 +18,7 @@ type BodySender = mpsc::Sender>; /// A stream of `Bytes`, used when receiving bodies. /// -/// A good default [`HttpBody`](crates::body::HttpBody) to use in many +/// A good default [`HttpBody`](crate::body::HttpBody) to use in many /// applications. #[must_use = "streams do nothing unless polled"] pub struct Body { diff --git a/src/body/to_bytes.rs b/src/body/to_bytes.rs index 6a8778d36d..4cce7857d7 100644 --- a/src/body/to_bytes.rs +++ b/src/body/to_bytes.rs @@ -5,7 +5,7 @@ use super::HttpBody; /// Concatenate the buffers from a body into a single `Bytes` asynchronously. /// /// This may require copying the data into a single buffer. If you don't need -/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate) +/// a contiguous buffer, prefer the [`aggregate`](crate::body::aggregate()) /// function. pub async fn to_bytes(body: T) -> Result where diff --git a/src/client/mod.rs b/src/client/mod.rs index 5f652fb042..c19fb6cd2d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -40,7 +40,7 @@ //! println!("status: {}", res.status()); //! //! // Concatenate the body stream into a single buffer... -//! let buf = hyper::body::to_bytes(res.into_body()).await?; +//! let buf = hyper::body::to_bytes(res).await?; //! //! println!("body: {:?}", buf); //! # Ok(()) diff --git a/tests/client.rs b/tests/client.rs index bdd297ea5e..636b6d7ed2 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -146,7 +146,7 @@ macro_rules! test { ); )* - let body = rt.block_on(concat(res.into_body())) + let body = rt.block_on(concat(res)) .expect("body concat wait"); let expected_res_body = Option::<&[u8]>::from($response_body) @@ -1065,7 +1065,7 @@ mod dispatch_impl { .request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }) .map_ok(|_| ()) }; @@ -1128,7 +1128,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); @@ -1296,7 +1296,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); @@ -1342,7 +1342,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); @@ -2098,7 +2098,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); @@ -2144,7 +2144,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); let rx = rx.then(|_| tokio::time::delay_for(Duration::from_millis(200))); @@ -2184,7 +2184,7 @@ mod conn { .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }); // pipelined request will hit NotReady, and thus should return an Error::Cancel @@ -2258,7 +2258,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.headers()["Upgrade"], "foobar"); - concat(res.into_body()) + concat(res) }); let rx = rx1.expect("thread panicked"); @@ -2348,7 +2348,7 @@ mod conn { .send_request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - concat(res.into_body()) + concat(res) }) .map_ok(|body| { assert_eq!(body.as_ref(), b""); diff --git a/tests/server.rs b/tests/server.rs index a7321864d0..994102887f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -1887,7 +1887,7 @@ impl tower_service::Service> for TestService { let replies = self.reply.clone(); Box::pin(async move { - while let Some(chunk) = req.body_mut().data().await { + while let Some(chunk) = req.data().await { match chunk { Ok(chunk) => { tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 17095392f3..0893a41794 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -355,7 +355,7 @@ async fn async_test(cfg: __TestConfig) { func(&req.headers()); } let sbody = sreq.body; - hyper::body::to_bytes(req.into_body()).map_ok(move |body| { + hyper::body::to_bytes(req).map_ok(move |body| { assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); let mut res = Response::builder() @@ -410,7 +410,7 @@ async fn async_test(cfg: __TestConfig) { for func in &cheaders { func(&res.headers()); } - hyper::body::to_bytes(res.into_body()) + hyper::body::to_bytes(res) }) .map_ok(move |body| { assert_eq!(body.as_ref(), cbody.as_slice(), "server body");