From 63b67aea7536457a0c336f9c5ede3431f34de363 Mon Sep 17 00:00:00 2001 From: David Yamnitsky Date: Wed, 9 Aug 2023 15:26:04 -0400 Subject: [PATCH] util: Add BodyStream. (#91) --- http-body-util/src/lib.rs | 2 +- http-body-util/src/stream.rs | 98 +++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/http-body-util/src/lib.rs b/http-body-util/src/lib.rs index 931ce61..059ada6 100644 --- a/http-body-util/src/lib.rs +++ b/http-body-util/src/lib.rs @@ -29,7 +29,7 @@ pub use self::either::Either; pub use self::empty::Empty; pub use self::full::Full; pub use self::limited::{LengthLimitError, Limited}; -pub use self::stream::StreamBody; +pub use self::stream::{BodyStream, StreamBody}; /// An extension trait for [`http_body::Body`] adding various combinators and adapters pub trait BodyExt: http_body::Body { diff --git a/http-body-util/src/stream.rs b/http-body-util/src/stream.rs index 79eea06..a6435f2 100644 --- a/http-body-util/src/stream.rs +++ b/http-body-util/src/stream.rs @@ -55,10 +55,57 @@ impl Stream for StreamBody { } } +pin_project! { + /// A stream created from a [`Body`]. + #[derive(Clone, Copy, Debug)] + pub struct BodyStream { + #[pin] + body: B, + } +} + +impl BodyStream { + /// Create a new `BodyStream`. + pub fn new(body: B) -> Self { + Self { body } + } +} + +impl Body for BodyStream +where + B: Body, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + self.project().body.poll_frame(cx) + } +} + +impl Stream for BodyStream +where + B: Body, +{ + type Item = Result, B::Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project().body.poll_frame(cx) { + Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod tests { - use crate::{BodyExt, StreamBody}; + use crate::{BodyExt, BodyStream, StreamBody}; use bytes::Bytes; + use futures_util::StreamExt; use http_body::Frame; use std::convert::Infallible; @@ -105,4 +152,53 @@ mod tests { assert!(body.frame().await.is_none()); } + + #[tokio::test] + async fn stream_from_body() { + let chunks: Vec> = vec![ + Ok(Frame::data(Bytes::from(vec![1]))), + Ok(Frame::data(Bytes::from(vec![2]))), + Ok(Frame::data(Bytes::from(vec![3]))), + ]; + let stream = futures_util::stream::iter(chunks); + let body = StreamBody::new(stream); + + let mut stream = BodyStream::new(body); + + assert_eq!( + stream + .next() + .await + .unwrap() + .unwrap() + .into_data() + .unwrap() + .as_ref(), + [1] + ); + assert_eq!( + stream + .next() + .await + .unwrap() + .unwrap() + .into_data() + .unwrap() + .as_ref(), + [2] + ); + assert_eq!( + stream + .next() + .await + .unwrap() + .unwrap() + .into_data() + .unwrap() + .as_ref(), + [3] + ); + + assert!(stream.next().await.is_none()); + } }