Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ tower-service = "0.3"
futures-core = { version = "0.3.28", default-features = false }
futures-util = { version = "0.3.28", default-features = false, optional = true }
sync_wrapper = { version = "1.0", features = ["futures"] }
pin-project-lite = "0.2.11"

# Optional deps...

Expand All @@ -129,7 +130,6 @@ percent-encoding = "2.3"
tokio = { version = "1.0", default-features = false, features = ["net", "time"] }
tower = { version = "0.5.2", default-features = false, features = ["timeout", "util"] }
tower-http = { version = "0.6.5", default-features = false, features = ["follow-redirect"] }
pin-project-lite = "0.2.11"

# Optional deps...
rustls-pki-types = { version = "1.9.0", features = ["std"], optional = true }
Expand Down
14 changes: 10 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,11 @@ fn _assert_impls() {
assert_sync::<Client>();
assert_clone::<Client>();

assert_send::<Request>();
assert_send::<RequestBuilder>();
#[cfg(not(target_arch = "wasm32"))]
{
assert_send::<Request>();
assert_send::<RequestBuilder>();
}

#[cfg(not(target_arch = "wasm32"))]
{
Expand All @@ -344,8 +347,11 @@ fn _assert_impls() {
assert_send::<Error>();
assert_sync::<Error>();

assert_send::<Body>();
assert_sync::<Body>();
#[cfg(not(target_arch = "wasm32"))]
{
assert_send::<Body>();
assert_sync::<Body>();
}
}

if_hyper! {
Expand Down
83 changes: 83 additions & 0 deletions src/wasm/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ enum Inner {
/// MultipartForm holds a multipart/form-data body.
#[cfg(feature = "multipart")]
MultipartForm(Form),
#[cfg(feature = "stream")]
Streaming(Streaming),
}

#[derive(Clone)]
Expand Down Expand Up @@ -58,6 +60,15 @@ impl Single {
}
}

pub(crate) type BodyFuture =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), JsValue>> + 'static>>;

#[cfg(feature = "stream")]
pub(crate) struct Streaming {
write_fut: BodyFuture,
readable: web_sys::ReadableStream,
}

impl Body {
/// Returns a reference to the internal data of the `Body`.
///
Expand All @@ -68,6 +79,56 @@ impl Body {
Inner::Single(single) => Some(single.as_bytes()),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => None,
#[cfg(feature = "stream")]
Inner::Streaming(_) => None,
}
}

/// Turn a futures `Stream` into a JS `ReadableStream`.
///
/// # Example
///
/// ```
/// # use reqwest::Body;
/// # use futures_util;
/// # fn main() {
/// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
/// Ok("hello"),
/// Ok(" "),
/// Ok("world"),
/// ];
///
/// let stream = futures_util::stream::iter(chunks);
///
/// let body = Body::wrap_stream(stream);
/// # }
/// ```
///
/// # Optional
///
/// This requires the `stream` feature to be enabled.
#[cfg(feature = "stream")]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
pub fn wrap_stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + 'static,
S::Error: Into<Box<dyn std::error::Error>>,
Bytes: From<S::Ok>,
{
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use wasm_bindgen::{JsError, UnwrapThrowExt};

let transform_stream =
wasm_streams::TransformStream::from_raw(web_sys::TransformStream::new().unwrap_throw());
Body {
inner: Inner::Streaming(Streaming {
write_fut: stream
.map_ok(|b| Single::Bytes(b.into()).to_js_value())
.map_err(|err| JsValue::from(JsError::new(&err.into().to_string())))
.forward(transform_stream.writable().into_sink())
.boxed_local(),
readable: transform_stream.readable().into_raw(),
}),
}
}

Expand All @@ -80,6 +141,18 @@ impl Body {
let js_value: &JsValue = form_data.as_ref();
Ok(js_value.to_owned())
}
#[cfg(feature = "stream")]
Inner::Streaming(streaming) => Ok(streaming.readable.clone().into()),
}
}

pub(crate) fn into_future(self) -> Option<BodyFuture> {
match self.inner {
Inner::Single(_) => None,
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => None,
#[cfg(feature = "stream")]
Inner::Streaming(streaming) => Some(streaming.write_fut),
}
}

Expand All @@ -88,6 +161,8 @@ impl Body {
match &self.inner {
Inner::Single(single) => Some(single),
Inner::MultipartForm(_) => None,
#[cfg(feature = "stream")]
Inner::Streaming(_) => None,
}
}

Expand All @@ -109,6 +184,10 @@ impl Body {
Inner::MultipartForm(form) => Self {
inner: Inner::MultipartForm(form),
},
#[cfg(feature = "stream")]
Inner::Streaming(streaming) => Self {
inner: Inner::Streaming(streaming),
},
}
}

Expand All @@ -117,6 +196,8 @@ impl Body {
Inner::Single(single) => single.is_empty(),
#[cfg(feature = "multipart")]
Inner::MultipartForm(form) => form.is_empty(),
#[cfg(feature = "stream")]
Inner::Streaming(_) => false,
}
}

Expand All @@ -127,6 +208,8 @@ impl Body {
}),
#[cfg(feature = "multipart")]
Inner::MultipartForm(_) => None,
#[cfg(feature = "stream")]
Inner::Streaming(_) => None,
}
}
}
Expand Down
69 changes: 62 additions & 7 deletions src/wasm/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use std::convert::TryInto;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use http::header::USER_AGENT;
use http::{HeaderMap, HeaderValue, Method};
use js_sys::{Promise, JSON};
use std::convert::TryInto;
use std::{fmt, future::Future, sync::Arc};
use pin_project_lite::pin_project;
use url::Url;
use wasm_bindgen::prelude::{wasm_bindgen, UnwrapThrowExt as _};

Expand Down Expand Up @@ -182,11 +188,46 @@ impl fmt::Debug for ClientBuilder {
}
}

pin_project! {
struct Pending {
#[pin]
body_fut: Option<super::body::BodyFuture>,
#[pin]
fetch: wasm_bindgen_futures::JsFuture,
}
}

impl Future for Pending {
type Output = Result<web_sys::Response, crate::error::BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use wasm_bindgen::JsCast;

let mut this = self.project();
if let Some(body_fut) = this.body_fut.as_mut().as_pin_mut() {
if let Poll::Ready(res) = body_fut.poll(cx) {
this.body_fut.set(None);
if let Err(err) = res {
return Poll::Ready(Err(crate::error::wasm(err)));
}
}
}
Poll::Ready(
ready!(this.fetch.poll(cx))
.map_err(crate::error::wasm)
.and_then(|js_resp| {
js_resp
.dyn_into::<web_sys::Response>()
.map_err(|_js_val| "promise resolved to unexpected type".into())
}),
)
}
}

// Can use new methods in web-sys when requiring v0.2.93.
// > `init.method(m)` to `init.set_method(m)`
// For now, ignore their deprecation.
#[allow(deprecated)]
async fn fetch(req: Request) -> crate::Result<Response> {
async fn fetch(mut req: Request) -> crate::Result<Response> {
// Build the js Request
let mut init = web_sys::RequestInit::new();
init.method(req.method().as_str());
Expand Down Expand Up @@ -216,11 +257,22 @@ async fn fetch(req: Request) -> crate::Result<Response> {
init.credentials(creds);
}

if let Some(body) = req.body() {
let body_fut = if let Some(body) = req.body_mut().take() {
if !body.is_empty() {
init.body(Some(body.to_js_value()?.as_ref()));
let fut = body.into_future();
if fut.is_some() {
js_sys::Reflect::set(&init, &"duplex".into(), &"half".into())
.map_err(crate::error::wasm)
.map_err(crate::error::builder)?;
}
fut
} else {
None
}
}
} else {
None
};

let mut abort = AbortGuard::new()?;
if let Some(timeout) = req.timeout() {
Expand All @@ -233,8 +285,11 @@ async fn fetch(req: Request) -> crate::Result<Response> {
.map_err(crate::error::builder)?;

// Await the fetch() promise
let p = js_fetch(&js_req);
let js_resp = super::promise::<web_sys::Response>(p)
let pending = Pending {
body_fut,
fetch: js_fetch(&js_req).into(),
};
let js_resp = pending
.await
.map_err(|error| {
if error.to_string() == "JsValue(\"reqwest::errors::TimedOut\")" {
Expand Down