Skip to content

Commit

Permalink
feat(body): remove stream cargo feature
Browse files Browse the repository at this point in the history
remove stream cargo feature and any usage of stream, as it isn't stable and shouldn't be depended on

closes issue hyperium#2855
  • Loading branch information
oddgrd committed Jun 14, 2022
1 parent 5fa113e commit f855b2e
Show file tree
Hide file tree
Showing 14 changed files with 536 additions and 750 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:

- name: Test
# Can't enable tcp feature since Miri does not support the tokio runtime
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,stream,nightly
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --features http1,http2,client,server,nightly

features:
name: features
Expand Down
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ full = [
"http1",
"http2",
"server",
"stream",
"runtime",
]

Expand All @@ -92,9 +91,6 @@ http2 = ["h2"]
client = []
server = []

# `impl Stream` for things
stream = []

# Tokio support
runtime = [
"tcp",
Expand Down
150 changes: 75 additions & 75 deletions benches/body.rs
Original file line number Diff line number Diff line change
@@ -1,88 +1,88 @@
#![feature(test)]
#![deny(warnings)]
// #![feature(test)]
// #![deny(warnings)]

extern crate test;
// extern crate test;

use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use hyper::body::Body;
// use bytes::Buf;
// use futures_util::stream;
// use futures_util::StreamExt;
// use hyper::body::Body;

macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("rt build");
// macro_rules! bench_stream {
// ($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
// let rt = tokio::runtime::Builder::new_current_thread()
// .build()
// .expect("rt build");

let $total_ident: usize = $bytes * $count;
$bencher.bytes = $total_ident as u64;
let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _;
// let $total_ident: usize = $bytes * $count;
// $bencher.bytes = $total_ident as u64;
// let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _;

$bencher.iter(|| {
rt.block_on(async {
let $body_pat = Body::wrap_stream(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);
$block;
});
});
}};
}
// $bencher.iter(|| {
// rt.block_on(async {
// let $body_pat =
// stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s));

macro_rules! benches {
($($name:ident, $bytes:expr, $count:expr;)+) => (
mod aggregate {
use super::*;
// $block;
// });
// });
// }};
// }

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let buf = hyper::body::aggregate(body).await.unwrap();
assert_eq!(buf.remaining(), total);
});
}
)+
}
// macro_rules! benches {
// ($($name:ident, $bytes:expr, $count:expr;)+) => (
// mod aggregate {
// use super::*;

mod manual_into_vec {
use super::*;
// $(
// #[bench]
// fn $name(b: &mut test::Bencher) {
// bench_stream!(b, bytes: $bytes, count: $count, total, body, {
// let buf = hyper::body::aggregate(body).await.unwrap();
// assert_eq!(buf.remaining(), total);
// });
// }
// )+
// }

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, mut body, {
let mut vec = Vec::new();
while let Some(chunk) = body.next().await {
vec.extend_from_slice(&chunk.unwrap());
}
assert_eq!(vec.len(), total);
});
}
)+
}
// mod manual_into_vec {
// use super::*;

mod to_bytes {
use super::*;
// $(
// #[bench]
// fn $name(b: &mut test::Bencher) {
// bench_stream!(b, bytes: $bytes, count: $count, total, mut body, {
// let mut vec = Vec::new();
// while let Some(chunk) = body.next().await {
// vec.extend_from_slice(&chunk.unwrap());
// }
// assert_eq!(vec.len(), total);
// });
// }
// )+
// }

$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let bytes = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(bytes.len(), total);
});
}
)+
}
)
}
// mod to_bytes {
// use super::*;

// ===== Actual Benchmarks =====
// $(
// #[bench]
// fn $name(b: &mut test::Bencher) {
// bench_stream!(b, bytes: $bytes, count: $count, total, body, {
// let bytes = hyper::body::to_bytes(body).await.unwrap();
// assert_eq!(bytes.len(), total);
// });
// }
// )+
// }
// )
// }

benches! {
bytes_1_000_count_2, 1_000, 2;
bytes_1_000_count_10, 1_000, 10;
bytes_10_000_count_1, 10_000, 1;
bytes_10_000_count_10, 10_000, 10;
}
// // ===== Actual Benchmarks =====

// benches! {
// bytes_1_000_count_2, 1_000, 2;
// bytes_1_000_count_10, 1_000, 10;
// bytes_10_000_count_1, 10_000, 1;
// bytes_10_000_count_10, 10_000, 10;
// }
31 changes: 15 additions & 16 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use futures_util::{stream, StreamExt};
use tokio::sync::oneshot;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
use hyper::{Response, Server};

macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => {{
Expand Down Expand Up @@ -97,13 +96,13 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
))
}

#[bench]
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("content-length", "1000000"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}
// #[bench]
// fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
// bench_server!(b, ("content-length", "1000000"), || {
// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
// Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
// })
// }

#[bench]
fn throughput_chunked_small_payload(b: &mut test::Bencher) {
Expand All @@ -119,13 +118,13 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
))
}

#[bench]
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
bench_server!(b, ("transfer-encoding", "chunked"), || {
static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
})
}
// #[bench]
// fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
// bench_server!(b, ("transfer-encoding", "chunked"), || {
// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _;
// Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)))
// })
// }

#[bench]
fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) {
Expand Down
26 changes: 12 additions & 14 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![deny(warnings)]

use futures_util::TryStreamExt;
use bytes::Buf;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::io::Read;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
Expand All @@ -16,23 +17,20 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),

// Convert to uppercase before sending back to client using a stream.
// Convert to uppercase before sending back to client.
(&Method::POST, "/echo/uppercase") => {
let chunk_stream = req.into_body().map_ok(|chunk| {
chunk
.iter()
.map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>()
});
Ok(Response::new(Body::wrap_stream(chunk_stream)))
let body = hyper::body::aggregate(req.into_body()).await?;

let uppercase_body = body
.reader()
.bytes()
.map(|byte| byte.unwrap().to_ascii_uppercase())
.collect::<Vec<u8>>();

Ok(Response::new(uppercase_body.into()))
}

// Reverse the entire body before sending back to the client.
//
// Since we don't know the end yet, we can't simply stream
// the chunks as they arrive as we did with the above uppercase endpoint.
// So here we do `.await` on the future, waiting on concatenating the full body,
// then afterwards the content can be reversed. Only then can we return a `Response`.
(&Method::POST, "/echo/reversed") => {
let whole_body = hyper::body::to_bytes(req.into_body()).await?;

Expand Down
11 changes: 2 additions & 9 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

Expand Down Expand Up @@ -48,11 +44,8 @@ fn not_found() -> Response<Body> {
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
if let Ok(contents) = tokio::fs::read(filename).await {
let body = contents.into();
return Ok(Response::new(body));
}

Expand Down
15 changes: 3 additions & 12 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]

use bytes::Buf;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
.unwrap();

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
)
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
let res_body = web_res.into_body();

Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
Expand Down
Loading

0 comments on commit f855b2e

Please sign in to comment.