diff --git a/src/lib.rs b/src/lib.rs index 99a7efc54..b2790e9cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -333,19 +333,17 @@ fn _assert_impls() { assert_sync::(); assert_clone::(); - assert_send::(); - assert_send::(); + assert_send::(); + assert_sync::(); #[cfg(not(target_arch = "wasm32"))] { + assert_send::(); + assert_send::(); assert_send::(); + assert_send::(); + assert_sync::(); } - - assert_send::(); - assert_sync::(); - - assert_send::(); - assert_sync::(); } if_hyper! { diff --git a/src/wasm/body.rs b/src/wasm/body.rs index 8d40c503f..2191e779b 100644 --- a/src/wasm/body.rs +++ b/src/wasm/body.rs @@ -1,10 +1,20 @@ #[cfg(feature = "multipart")] use super::multipart::Form; +use super::AbortGuard; /// dox use bytes::Bytes; +#[cfg(feature = "stream")] +use futures_core::Stream; +#[cfg(feature = "stream")] +use futures_util::stream::{self, StreamExt}; use js_sys::Uint8Array; +#[cfg(feature = "stream")] +use std::pin::Pin; use std::{borrow::Cow, fmt}; +#[cfg(feature = "stream")] +use wasm_bindgen::JsCast; use wasm_bindgen::JsValue; +use web_sys::Response as WebResponse; /// The body of a `Request`. /// @@ -22,6 +32,7 @@ enum Inner { /// MultipartForm holds a multipart/form-data body. #[cfg(feature = "multipart")] MultipartForm(Form), + Streaming(StreamingBody), } #[derive(Clone)] @@ -58,6 +69,70 @@ impl Single { } } +struct StreamingBody { + response: WebResponse, + abort: AbortGuard, +} + +impl StreamingBody { + #[cfg(feature = "stream")] + fn into_stream(self) -> Pin>>> { + let StreamingBody { response, abort } = self; + + if let Some(body) = response.body() { + let abort = abort; + let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); + Box::pin(body.into_stream().map(move |buf_js| { + // Keep the abort guard alive while the stream is active. + let _abort = &abort; + let buf_js = buf_js + .map_err(crate::error::wasm) + .map_err(crate::error::decode)?; + let buffer = Uint8Array::new(&buf_js); + let mut bytes = vec![0; buffer.length() as usize]; + buffer.copy_to(&mut bytes); + Ok(bytes.into()) + })) + } else { + drop(abort); + Box::pin(stream::empty()) + } + } + + async fn into_bytes(self) -> crate::Result { + let StreamingBody { response, abort } = self; + let promise = response + .array_buffer() + .map_err(crate::error::wasm) + .map_err(crate::error::decode)?; + let js_value = super::promise::(promise) + .await + .map_err(crate::error::decode)?; + drop(abort); + let buffer = Uint8Array::new(&js_value); + let mut bytes = vec![0; buffer.length() as usize]; + buffer.copy_to(&mut bytes); + Ok(bytes.into()) + } + + async fn into_text(self) -> crate::Result { + let StreamingBody { response, abort } = self; + let promise = response + .text() + .map_err(crate::error::wasm) + .map_err(crate::error::decode)?; + let js_value = super::promise::(promise) + .await + .map_err(crate::error::decode)?; + drop(abort); + if let Some(text) = js_value.as_string() { + Ok(text) + } else { + Err(crate::error::decode("response.text isn't string")) + } + } +} + impl Body { /// Returns a reference to the internal data of the `Body`. /// @@ -68,6 +143,7 @@ impl Body { Inner::Single(single) => Some(single.as_bytes()), #[cfg(feature = "multipart")] Inner::MultipartForm(_) => None, + Inner::Streaming(_) => None, } } @@ -80,6 +156,9 @@ impl Body { let js_value: &JsValue = form_data.as_ref(); Ok(js_value.to_owned()) } + Inner::Streaming(_) => Err(crate::error::decode( + "streaming body cannot be converted to JsValue", + )), } } @@ -117,6 +196,7 @@ impl Body { Inner::Single(single) => single.is_empty(), #[cfg(feature = "multipart")] Inner::MultipartForm(form) => form.is_empty(), + Inner::Streaming(_) => false, } } @@ -127,6 +207,63 @@ impl Body { }), #[cfg(feature = "multipart")] Inner::MultipartForm(_) => None, + Inner::Streaming(_) => None, + } + } + + pub(super) fn from_response(response: WebResponse, abort: AbortGuard) -> Body { + if response.body().is_some() { + Body { + inner: Inner::Streaming(StreamingBody { response, abort }), + } + } else { + // Even without a body, ensure the guard lives until completion. + drop(abort); + Body::default() + } + } + + /// Consume the body into bytes. + pub async fn bytes(self) -> crate::Result { + match self.inner { + Inner::Single(Single::Bytes(bytes)) => Ok(bytes), + Inner::Single(Single::Text(text)) => Ok(Bytes::copy_from_slice(text.as_bytes())), + #[cfg(feature = "multipart")] + Inner::MultipartForm(_) => Err(crate::error::decode( + "multipart body cannot be converted into bytes", + )), + Inner::Streaming(streaming) => streaming.into_bytes().await, + } + } + + /// Consume the body into a UTF-8 string. + pub async fn text(self) -> crate::Result { + match self.inner { + Inner::Single(Single::Bytes(bytes)) => String::from_utf8(bytes.to_vec()) + .map_err(|_| crate::error::decode("body is not valid UTF-8")), + Inner::Single(Single::Text(text)) => Ok(text.into_owned()), + #[cfg(feature = "multipart")] + Inner::MultipartForm(_) => Err(crate::error::decode( + "multipart body cannot be converted into text", + )), + Inner::Streaming(streaming) => streaming.into_text().await, + } + } + + /// Convert the body into a stream of `Bytes`. + #[cfg(feature = "stream")] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + pub fn bytes_stream(self) -> Pin>>> { + match self.inner { + Inner::Single(single) => { + let bytes = Bytes::copy_from_slice(single.as_bytes()); + Box::pin(stream::once(async move { Ok(bytes) })) + } + #[cfg(feature = "multipart")] + Inner::MultipartForm(_) => Box::pin(stream::once(async { + Err(crate::error::decode("multipart body cannot be streamed")) + })), + Inner::Streaming(streaming) => streaming.into_stream(), } } } diff --git a/src/wasm/response.rs b/src/wasm/response.rs index 52fad04ee..340753dee 100644 --- a/src/wasm/response.rs +++ b/src/wasm/response.rs @@ -2,16 +2,11 @@ use std::fmt; use bytes::Bytes; use http::{HeaderMap, StatusCode}; -use js_sys::Uint8Array; use url::Url; -use crate::wasm::AbortGuard; +use crate::{response::ResponseUrl, wasm::AbortGuard}; -#[cfg(feature = "stream")] -use wasm_bindgen::JsCast; - -#[cfg(feature = "stream")] -use futures_util::stream::{self, StreamExt}; +use super::Body; #[cfg(feature = "json")] use serde::de::DeserializeOwned; @@ -97,68 +92,23 @@ impl Response { /// Get the response text. pub async fn text(self) -> crate::Result { - let p = self - .http - .body() + Body::from_response(self.http.into_body(), self._abort) .text() - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?; - let js_val = super::promise::(p) .await - .map_err(crate::error::decode)?; - if let Some(s) = js_val.as_string() { - Ok(s) - } else { - Err(crate::error::decode("response.text isn't string")) - } } /// Get the response as bytes pub async fn bytes(self) -> crate::Result { - let p = self - .http - .body() - .array_buffer() - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?; - - let buf_js = super::promise::(p) + Body::from_response(self.http.into_body(), self._abort) + .bytes() .await - .map_err(crate::error::decode)?; - - let buffer = Uint8Array::new(&buf_js); - let mut bytes = vec![0; buffer.length() as usize]; - buffer.copy_to(&mut bytes); - Ok(bytes.into()) } /// Convert the response into a `Stream` of `Bytes` from the body. #[cfg(feature = "stream")] pub fn bytes_stream(self) -> impl futures_core::Stream> { - use futures_core::Stream; - use std::pin::Pin; - - let web_response = self.http.into_body(); - let abort = self._abort; - - if let Some(body) = web_response.body() { - let body = wasm_streams::ReadableStream::from_raw(body.unchecked_into()); - Box::pin(body.into_stream().map(move |buf_js| { - // Keep the abort guard alive as long as this stream is. - let _abort = &abort; - let buffer = Uint8Array::new( - &buf_js - .map_err(crate::error::wasm) - .map_err(crate::error::decode)?, - ); - let mut bytes = vec![0; buffer.length() as usize]; - buffer.copy_to(&mut bytes); - Ok(bytes.into()) - })) as Pin>>> - } else { - // If there's no body, return an empty stream. - Box::pin(stream::empty()) as Pin>>> - } + let body = Body::from_response(self.http.into_body(), self._abort); + body.bytes_stream() } // util methods @@ -193,3 +143,13 @@ impl fmt::Debug for Response { .finish() } } + +impl From for http::Response { + fn from(response: Response) -> http::Response { + let Response { http, _abort, url } = response; + let (mut parts, body) = http.into_parts(); + parts.extensions.insert(ResponseUrl(*url)); + let body = Body::from_response(body, _abort); + http::Response::from_parts(parts, body) + } +} diff --git a/tests/wasm_simple.rs b/tests/wasm_simple.rs index bd781e771..b7fed1c32 100644 --- a/tests/wasm_simple.rs +++ b/tests/wasm_simple.rs @@ -1,6 +1,8 @@ #![cfg(target_arch = "wasm32")] use std::time::Duration; +#[cfg(feature = "stream")] +use futures_util::StreamExt; use wasm_bindgen::prelude::*; use wasm_bindgen_test::*; wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); @@ -37,3 +39,32 @@ async fn request_with_timeout() { assert!(err.is_request()); assert!(err.is_timeout()); } + +#[wasm_bindgen_test] +async fn convert_response_into_http_response_body_bytes() { + let res = reqwest::get("https://hyper.rs").await.expect("fetch body"); + + let http_res: http::Response = res.into(); + let body = http_res.into_body(); + let bytes = body.bytes().await.expect("read body bytes"); + + assert!(!bytes.is_empty()); +} + +#[cfg(feature = "stream")] +#[wasm_bindgen_test] +async fn convert_response_into_http_response_body_stream() { + let res = reqwest::get("https://hyper.rs") + .await + .expect("fetch streaming bytes"); + + let http_res: http::Response = res.into(); + let mut stream = http_res.into_body().bytes_stream(); + let mut total = 0usize; + + while let Some(chunk) = stream.next().await { + total += chunk.expect("stream chunk").len(); + } + + assert!(total > 0); +}