Skip to content

Commit

Permalink
feat(body): remove stream cargo feature
Browse files Browse the repository at this point in the history
remove stream cargo feature and any usage of stream, as it isn't stable and shouldn't be depended on

closes issue hyperium#2855
  • Loading branch information
oddgrd committed May 25, 2022
1 parent 775fac1 commit b44898c
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 147 deletions.
4 changes: 0 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ full = [
"http1",
"http2",
"server",
"stream",
"runtime",
]

Expand All @@ -90,9 +89,6 @@ http2 = ["h2"]
client = []
server = []

# `impl Stream` for things
stream = []

# Tokio support
runtime = [
"tcp",
Expand Down
11 changes: 2 additions & 9 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#![deny(warnings)]

use tokio::fs::File;

use tokio_util::codec::{BytesCodec, FramedRead};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Result, Server, StatusCode};

Expand Down Expand Up @@ -48,11 +44,8 @@ fn not_found() -> Response<Body> {
}

async fn simple_file_send(filename: &str) -> Result<Response<Body>> {
// Serve a file by asynchronously reading it by chunks using tokio-util crate.

if let Ok(file) = File::open(filename).await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
if let Ok(contents) = tokio::fs::read(filename).await {
let body = contents.into();
return Ok(Response::new(body));
}

Expand Down
15 changes: 3 additions & 12 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]

use bytes::Buf;
use futures_util::{stream, StreamExt};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Client, Method, Request, Response, Server, StatusCode};
Expand All @@ -24,18 +23,10 @@ async fn client_request_response(client: &Client<HttpConnector>) -> Result<Respo
.unwrap();

let web_res = client.request(req).await?;
// Compare the JSON we sent (before) with what we received (after):
let before = stream::once(async {
Ok(format!(
"<b>POST request body</b>: {}<br><b>Response</b>: ",
POST_DATA,
)
.into())
});
let after = web_res.into_body();
let body = Body::wrap_stream(before.chain(after));

Ok(Response::new(body))
let res_body = web_res.into_body();

Ok(Response::new(res_body))
}

async fn api_post_response(req: Request<Body>) -> Result<Response<Body>> {
Expand Down
82 changes: 0 additions & 82 deletions src/body/body.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError;
use std::fmt;

use bytes::Bytes;
use futures_channel::mpsc;
use futures_channel::oneshot;
use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use super::DecodedLength;
#[cfg(feature = "stream")]
use crate::common::sync_wrapper::SyncWrapper;
use crate::common::Future;
#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
use crate::common::Never;
Expand Down Expand Up @@ -56,12 +50,6 @@ enum Kind {
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
#[cfg(feature = "stream")]
Wrapped(
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}

struct Extra {
Expand Down Expand Up @@ -164,39 +152,6 @@ impl Body {
(tx, rx)
}

/// Wrap a futures `Stream` in a box inside `Body`.
///
/// # Example
///
/// ```
/// # use hyper::Body;
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("hello"),
/// Ok(" "),
/// Ok("world"),
/// ];
///
/// let stream = futures_util::stream::iter(chunks);
///
/// let body = Body::wrap_stream(stream);
/// ```
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

fn new(kind: Kind) -> Body {
Body { kind, extra: None }
}
Expand Down Expand Up @@ -329,12 +284,6 @@ impl Body {

#[cfg(feature = "ffi")]
Kind::Ffi(ref mut body) => body.poll_data(cx),

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
}
}

Expand Down Expand Up @@ -405,8 +354,6 @@ impl HttpBody for Body {
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "ffi")]
Kind::Ffi(..) => false,
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false,
}
}

Expand All @@ -426,8 +373,6 @@ impl HttpBody for Body {
match self.kind {
Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
Kind::Once(None) => SizeHint::with_exact(0),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } => opt_len!(content_length),
#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
Kind::H2 { content_length, .. } => opt_len!(content_length),
Expand Down Expand Up @@ -457,33 +402,6 @@ impl fmt::Debug for Body {
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl Stream for Body {
type Item = crate::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
HttpBody::poll_data(self, cx)
}
}

/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}

impl From<Bytes> for Body {
#[inline]
fn from(chunk: Bytes) -> Body {
Expand Down
40 changes: 0 additions & 40 deletions src/server/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
//! connections.
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
#[cfg(feature = "stream")]
use futures_core::Stream;
#[cfg(feature = "stream")]
use pin_project_lite::pin_project;

use crate::common::{
task::{self, Poll},
Pin,
Expand Down Expand Up @@ -74,38 +69,3 @@ where

PollFn(func)
}

/// Adapt a `Stream` of incoming connections into an `Accept`.
///
/// # Optional
///
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
where
S: Stream<Item = Result<IO, E>>,
{
pin_project! {
struct FromStream<S> {
#[pin]
stream: S,
}
}

impl<S, IO, E> Accept for FromStream<S>
where
S: Stream<Item = Result<IO, E>>,
{
type Conn = IO;
type Error = E;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
self.project().stream.poll_next(cx)
}
}

FromStream { stream }
}

0 comments on commit b44898c

Please sign in to comment.