Skip to content

Commit

Permalink
feat(body): remove stream cargo feature
Browse files Browse the repository at this point in the history
remove stream cargo feature and any usage of stream, as it isn't stable and shouldn't be depended on

closes issue hyperium#2855
  • Loading branch information
oddgrd committed May 26, 2022
1 parent 4678be9 commit f97d19c
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 235 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ full = [
"http1",
"http2",
"server",
"stream",
"runtime",
]

Expand All @@ -90,9 +89,6 @@ http2 = ["h2"]
client = []
server = []

# `impl Stream` for things
stream = []

# Tokio support
runtime = [
"tcp",
Expand Down
26 changes: 12 additions & 14 deletions examples/echo.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#![deny(warnings)]

use futures_util::TryStreamExt;
use bytes::Buf;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::io::Read;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
Expand All @@ -16,23 +17,20 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),

// Convert to uppercase before sending back to client using a stream.
// Convert to uppercase before sending back to client.
(&Method::POST, "/echo/uppercase") => {
let chunk_stream = req.into_body().map_ok(|chunk| {
chunk
.iter()
.map(|byte| byte.to_ascii_uppercase())
.collect::<Vec<u8>>()
});
Ok(Response::new(Body::wrap_stream(chunk_stream)))
let body = hyper::body::aggregate(req.into_body()).await?;

let uppercase_body = body
.reader()
.bytes()
.map(|byte| byte.unwrap().to_ascii_uppercase())
.collect::<Vec<u8>>();

Ok(Response::new(uppercase_body.into()))
}

// Reverse the entire body before sending back to the client.
//
// Since we don't know the end yet, we can't simply stream
// the chunks as they arrive as we did with the above uppercase endpoint.
// So here we do `.await` on the future, waiting on concatenating the full body,
// then afterwards the content can be reversed. Only then can we return a `Response`.
(&Method::POST, "/echo/reversed") => {
let whole_body = hyper::body::to_bytes(req.into_body()).await?;

Expand Down
11 changes: 2 additions & 9 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

Expand Down Expand Up @@ -48,11 +44,8 @@ fn not_found() -> Response<Body> {
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
if let Ok(contents) = tokio::fs::read(filename).await {
let body = contents.into();
return Ok(Response::new(body));
}

Expand Down
15 changes: 3 additions & 12 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]

use bytes::Buf;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
.unwrap();

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
)
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
let res_body = web_res.into_body();

Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
Expand Down
82 changes: 0 additions & 82 deletions src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
Expand Down Expand Up @@ -56,12 +50,6 @@ enum Kind {
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
#[cfg(feature = "stream")]
Wrapped(
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}

struct Extra {
Expand Down Expand Up @@ -164,39 +152,6 @@ impl Body {
(tx, rx)
}

/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
///
/// ```
/// # use hyper::Body;
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("hello"),
/// Ok(" "),
/// Ok("world"),
/// ];
///
/// let stream = futures_util::stream::iter(chunks);
///
/// let body = Body::wrap_stream(stream);
/// ```
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

fn new(kind: Kind) -> Body {
Body { kind, extra: None }
}
Expand Down Expand Up @@ -329,12 +284,6 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
}
}

Expand Down Expand Up @@ -405,8 +354,6 @@ impl HttpBody for Body {
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
}
}

Expand All @@ -426,8 +373,6 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
Expand Down Expand Up @@ -457,33 +402,6 @@ impl fmt::Debug for Body {
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl Stream for Body {
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}

impl From<Bytes> for Body {
#[inline]
fn from(chunk: Bytes) -> Body {
Expand Down
5 changes: 1 addition & 4 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ pub(crate) mod io;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
mod lazy;
mod never;
#[cfg(any(
feature = "stream",
all(feature = "client", any(feature = "http1", feature = "http2"))
))]
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
//! - `runtime`: Enables convenient integration with `tokio`, providing
//! connectors and acceptors for TCP, and a default executor.
//! - `tcp`: Enables convenient implementations over TCP (using tokio).
//! - `stream`: Provides `futures::Stream` capabilities.
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section
Expand Down
40 changes: 0 additions & 40 deletions src/server/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
//! connections.
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
#[cfg(feature = "stream")]
use futures_core::Stream;
#[cfg(feature = "stream")]
use pin_project_lite::pin_project;

use crate::common::{
task::{self, Poll},
Pin,
Expand Down Expand Up @@ -74,38 +69,3 @@ where

PollFn(func)
}

/// Adapt a `Stream` of incoming connections into an `Accept`.
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
where
S: Stream<Item = Result<IO, E>>,
{
pin_project! {
struct FromStream<S> {
#[pin]
stream: S,
}
}

impl<S, IO, E> Accept for FromStream<S>
where
S: Stream<Item = Result<IO, E>>,
{
type Conn = IO;
type Error = E;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
self.project().stream.poll_next(cx)
}
}

FromStream { stream }
}
10 changes: 5 additions & 5 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ test! {
headers: {
"Content-Length" => "5",
},
body: (Body::wrap_stream(Body::from("hello"))),
body: (Body::from("hello")),
},
response:
status: OK,
Expand Down Expand Up @@ -505,7 +505,7 @@ test! {
//
// But since the headers cannot tell us, and the method typically
// doesn't have a body, the body must be ignored.
body: (Body::wrap_stream(Body::from("hello"))),
body: (Body::from("hello")),
},
response:
status: OK,
Expand Down Expand Up @@ -536,7 +536,7 @@ test! {
// but we're wrapping a non-empty stream.
//
// But since the headers cannot tell us, the body must be ignored.
body: (Body::wrap_stream(Body::from("hello"))),
body: (Body::from("hello")),
},
response:
status: OK,
Expand Down Expand Up @@ -621,7 +621,7 @@ test! {
request: {
method: POST,
url: "http://{addr}/chunks",
body: (Body::wrap_stream(Body::from("foo bar baz"))),
body: (Body::from("foo bar baz")),
},
response:
status: OK,
Expand Down Expand Up @@ -1712,7 +1712,7 @@ mod dispatch_impl {
let req = Request::builder()
.method("POST")
.uri(&*format!("http://{}/a", addr))
.body(Body::wrap_stream(delayed_body))
.body(delayed_body)
.unwrap();
let client2 = client.clone();

Expand Down
Loading

0 comments on commit f97d19c

Please sign in to comment.