Skip to content

Commit

Permalink
util: Add BodyStream. (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsky committed Aug 9, 2023
1 parent c9f1337 commit 63b67ae
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
2 changes: 1 addition & 1 deletion http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use self::either::Either;
pub use self::empty::Empty;
pub use self::full::Full;
pub use self::limited::{LengthLimitError, Limited};
pub use self::stream::StreamBody;
pub use self::stream::{BodyStream, StreamBody};

/// An extension trait for [`http_body::Body`] adding various combinators and adapters
pub trait BodyExt: http_body::Body {
Expand Down
98 changes: 97 additions & 1 deletion http-body-util/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,57 @@ impl<S: Stream> Stream for StreamBody<S> {
}
}

pin_project! {
/// A stream created from a [`Body`].
#[derive(Clone, Copy, Debug)]
pub struct BodyStream<B> {
#[pin]
body: B,
}
}

impl<B> BodyStream<B> {
/// Create a new `BodyStream`.
pub fn new(body: B) -> Self {
Self { body }
}
}

impl<B> Body for BodyStream<B>
where
B: Body,
{
type Data = B::Data;
type Error = B::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project().body.poll_frame(cx)
}
}

impl<B> Stream for BodyStream<B>
where
B: Body,
{
type Item = Result<Frame<B::Data>, B::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().body.poll_frame(cx) {
Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod tests {
use crate::{BodyExt, StreamBody};
use crate::{BodyExt, BodyStream, StreamBody};
use bytes::Bytes;
use futures_util::StreamExt;
use http_body::Frame;
use std::convert::Infallible;

Expand Down Expand Up @@ -105,4 +152,53 @@ mod tests {

assert!(body.frame().await.is_none());
}

#[tokio::test]
async fn stream_from_body() {
let chunks: Vec<Result<_, Infallible>> = vec![
Ok(Frame::data(Bytes::from(vec![1]))),
Ok(Frame::data(Bytes::from(vec![2]))),
Ok(Frame::data(Bytes::from(vec![3]))),
];
let stream = futures_util::stream::iter(chunks);
let body = StreamBody::new(stream);

let mut stream = BodyStream::new(body);

assert_eq!(
stream
.next()
.await
.unwrap()
.unwrap()
.into_data()
.unwrap()
.as_ref(),
[1]
);
assert_eq!(
stream
.next()
.await
.unwrap()
.unwrap()
.into_data()
.unwrap()
.as_ref(),
[2]
);
assert_eq!(
stream
.next()
.await
.unwrap()
.unwrap()
.into_data()
.unwrap()
.as_ref(),
[3]
);

assert!(stream.next().await.is_none());
}
}

0 comments on commit 63b67ae

Please sign in to comment.