Skip to content

Commit

Permalink
feat(body): update Body trait to use Frames (#3020)
Browse files Browse the repository at this point in the history
The `Body` trait was adjusted to be forwards compatible with adding new
frame types. That resulted in changing from `poll_data` and `poll_trailers`
to a single `poll_frame` function. More can be learned from the proposal
in #2840.

Closes #3010

BREAKING CHANGE: The polling functions of the `Body` trait have been
  redesigned.

  The free functions `hyper::body::to_bytes` and `aggregate` have been
  removed. Similar functionality is on
  `http_body_util::BodyExt::collect`.
  • Loading branch information
seanmonstar authored Oct 24, 2022
1 parent 91e83b7 commit 0888623
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 319 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = { git = "https://github.com/hyperium/http-body", rev = "6d7dd17" }
http-body-util = { git = "https://github.com/hyperium/http-body", rev = "6d7dd17" }
http-body = { git = "https://github.com/hyperium/http-body", rev = "0e20ca9" }
http-body-util = { git = "https://github.com/hyperium/http-body", rev = "0e20ca9" }
httpdate = "1.0"
httparse = "1.6"
h2 = { version = "0.3.9", optional = true }
Expand Down
12 changes: 7 additions & 5 deletions benches/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ extern crate test;
use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use http_body_util::StreamBody;
use http_body::Frame;
use http_body_util::{BodyExt, StreamBody};

macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
Expand All @@ -21,7 +22,8 @@ macro_rules! bench_stream {
$bencher.iter(|| {
rt.block_on(async {
let $body_pat = StreamBody::new(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
stream::iter(__s.iter())
.map(|&s| Ok::<_, std::convert::Infallible>(Frame::data(s))),
);

$block;
Expand All @@ -39,7 +41,7 @@ macro_rules! benches {
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let buf = hyper::body::aggregate(body).await.unwrap();
let buf = BodyExt::collect(body).await.unwrap().aggregate();
assert_eq!(buf.remaining(), total);
});
}
Expand All @@ -55,7 +57,7 @@ macro_rules! benches {
bench_stream!(b, bytes: $bytes, count: $count, total, mut body, {
let mut vec = Vec::new();
while let Some(chunk) = body.next().await {
vec.extend_from_slice(&chunk.unwrap());
vec.extend_from_slice(&chunk.unwrap().into_data().unwrap());
}
assert_eq!(vec.len(), total);
});
Expand All @@ -70,7 +72,7 @@ macro_rules! benches {
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let bytes = hyper::body::to_bytes(body).await.unwrap();
let bytes = BodyExt::collect(body).await.unwrap().to_bytes();
assert_eq!(bytes.len(), total);
});
}
Expand Down
5 changes: 3 additions & 2 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures_util::{stream, StreamExt};
use http_body_util::{BodyExt, Full, StreamBody};
use tokio::sync::oneshot;

use hyper::body::Frame;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::Response;
Expand Down Expand Up @@ -109,7 +110,7 @@ fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), move || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
BodyExt::boxed(StreamBody::new(
stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)),
stream::iter(S.iter()).map(|&s| Ok::<_, String>(Frame::data(s))),
))
})
}
Expand All @@ -133,7 +134,7 @@ fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
BodyExt::boxed(StreamBody::new(
stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)),
stream::iter(S.iter()).map(|&s| Ok::<_, String>(Frame::data(s))),
))
})
}
Expand Down
12 changes: 7 additions & 5 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::env;

use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::Body as _, Request};
use http_body_util::{BodyExt, Empty};
use hyper::Request;
use tokio::io::{self, AsyncWriteExt as _};
use tokio::net::TcpStream;

Expand Down Expand Up @@ -62,9 +62,11 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {

// Stream the body, writing each chunk to stdout as we get it
// (instead of buffering and printing at the end).
while let Some(next) = res.data().await {
let chunk = next?;
io::stdout().write_all(&chunk).await?;
while let Some(next) = res.frame().await {
let frame = next?;
if let Some(chunk) = frame.data_ref() {
io::stdout().write_all(&chunk).await?;
}
}

println!("\n\nDone!");
Expand Down
4 changes: 2 additions & 2 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![warn(rust_2018_idioms)]

use bytes::Bytes;
use http_body_util::Empty;
use http_body_util::{BodyExt, Empty};
use hyper::{body::Buf, Request};
use serde::Deserialize;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -48,7 +48,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let res = sender.send_request(req).await?;

// asynchronously aggregate the chunks of the body
let body = hyper::body::aggregate(res).await?;
let body = res.collect().await?.aggregate();

// try to parse as json with serde_json
let users = serde_json::from_reader(body.reader())?;
Expand Down
2 changes: 1 addition & 1 deletion examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async fn echo(req: Request<Recv>) -> Result<Response<BoxBody<Bytes, hyper::Error
return Ok(resp);
}

let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let whole_body = req.collect().await?.to_bytes();

let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(full(reversed_body)))
Expand Down
2 changes: 1 addition & 1 deletion examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn param_example(
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
(&Method::POST, "/post") => {
// Concatenate the body...
let b = hyper::body::to_bytes(req).await?;
let b = req.collect().await?.to_bytes();
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
16 changes: 4 additions & 12 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::net::SocketAddr;
use std::rc::Rc;
use tokio::net::TcpListener;

use hyper::body::{Body as HttpBody, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::body::{Body as HttpBody, Bytes, Frame};
use hyper::service::service_fn;
use hyper::{Error, Response};
use std::marker::PhantomData;
Expand All @@ -33,18 +32,11 @@ impl HttpBody for Body {
type Data = Bytes;
type Error = Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(self.get_mut().data.take().map(Ok))
}

fn poll_trailers(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(self.get_mut().data.take().map(|d| Ok(Frame::data(d))))
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn client_request_response() -> Result<Response<BoxBody>> {

async fn api_post_response(req: Request<Recv>) -> Result<Response<BoxBody>> {
// Aggregate the body...
let whole_body = hyper::body::aggregate(req).await?;
let whole_body = req.collect().await?.aggregate();
// Decode as JSON...
let mut data: serde_json::Value = serde_json::from_reader(whole_body.reader())?;
// Change the JSON...
Expand Down
31 changes: 0 additions & 31 deletions src/body/aggregate.rs

This file was deleted.

Loading

0 comments on commit 0888623

Please sign in to comment.