Skip to content

Commit

Permalink
feat(body): remove Sync bound for Body::wrap_stream
Browse files Browse the repository at this point in the history
A stream wrapped into a Body previously needed to implement `Sync` so
that the Body type implements this autotrait as well (which is needed
due to limitations in async/await). Since a stream only offers one
method that is called with an exclusive reference, this type is
statically proven to be Sync already. In theory it should be fine to add
an `unsafe impl Sync`, but this commit instead adds a SyncWrapper to
enlist the compiler’s help in proving that this is (and remains) correct.

This makes it easier to construct response bodies for client code.
  • Loading branch information
rkuhn authored and seanmonstar committed May 19, 2020
1 parent d5b0ee5 commit 042c770
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 15 deletions.
25 changes: 10 additions & 15 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures_util::TryStreamExt;
use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint};

use crate::common::sync_wrapper::SyncWrapper;
use crate::common::{task, watch, Future, Never, Pin, Poll};
use crate::proto::h2::ping;
use crate::proto::DecodedLength;
Expand Down Expand Up @@ -42,13 +43,11 @@ enum Kind {
content_length: DecodedLength,
recv: h2::RecvStream,
},
// NOTE: This requires `Sync` because of how easy it is to use `await`
// while a borrow of a `Request<Body>` exists.
//
// See https://github.com/rust-lang/rust/issues/57017
#[cfg(feature = "stream")]
Wrapped(
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>,
SyncWrapper<
Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
>,
),
}

Expand Down Expand Up @@ -156,12 +155,12 @@ impl Body {
#[cfg(feature = "stream")]
pub fn wrap_stream<S, O, E>(stream: S) -> Body
where
S: Stream<Item = Result<O, E>> + Send + Sync + 'static,
S: Stream<Item = Result<O, E>> + Send + 'static,
O: Into<Bytes> + 'static,
E: Into<Box<dyn StdError + Send + Sync>> + 'static,
{
let mapped = stream.map_ok(Into::into).map_err(Into::into);
Body::new(Kind::Wrapped(Box::pin(mapped)))
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
Expand Down Expand Up @@ -280,7 +279,7 @@ impl Body {
},

#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) {
Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
None => Poll::Ready(None),
},
Expand Down Expand Up @@ -402,16 +401,12 @@ impl Stream for Body {
/// This function requires enabling the `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
for Body
{
impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
#[inline]
fn from(
stream: Box<
dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send + Sync,
>,
stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
) -> Body {
Body::new(Kind::Wrapped(stream.into()))
Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) mod exec;
pub(crate) mod io;
mod lazy;
mod never;
pub(crate) mod sync_wrapper;
pub(crate) mod task;
pub(crate) mod watch;

Expand Down
115 changes: 115 additions & 0 deletions src/common/sync_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* This is a copy of the sync_wrapper crate.
*/
//! A mutual exclusion primitive that relies on static type information only
//!
//! This library is inspired by [this discussion](https://internals.rust-lang.org/t/what-shall-sync-mean-across-an-await/12020/2).
#![doc(html_logo_url = "https://developer.actyx.com/img/logo.svg")]
#![doc(html_favicon_url = "https://developer.actyx.com/img/favicon.ico")]

/// A mutual exclusion primitive that relies on static type information only
///
/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut`
/// reference, the Rust type system ensures that no other part of the program can hold another
/// reference to the data. Therefore it is safe to access it even if the current thread obtained
/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking
/// a [`Mutex`] can be avoided by using this static version.
///
/// One example where this is often applicable is [`Future`], which requires an exclusive reference
/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by
/// multiple threads concurrently, the executor can only run the `Future` on one thread at any
/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can
/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it
/// contains such a `Future`.
///
/// # Example
///
/// ```ignore
/// use hyper::common::sync_wrapper::SyncWrapper;
/// use std::future::Future;
///
/// struct MyThing {
/// future: SyncWrapper<Box<dyn Future<Output = String> + Send>>,
/// }
///
/// impl MyThing {
/// // all accesses to `self.future` now require an exclusive reference or ownership
/// }
///
/// fn assert_sync<T: Sync>() {}
///
/// assert_sync::<MyThing>();
/// ```
///
/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll
/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html
#[repr(transparent)]
pub struct SyncWrapper<T>(T);

impl<T> SyncWrapper<T> {
/// Creates a new SyncWrapper containing the given value.
///
/// # Examples
///
/// ```ignore
/// use hyper::common::sync_wrapper::SyncWrapper;
///
/// let wrapped = SyncWrapper::new(42);
/// ```
pub fn new(value: T) -> Self {
Self(value)
}

/// Acquires a reference to the protected value.
///
/// This is safe because it requires an exclusive reference to the wrapper. Therefore this method
/// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which
/// returns an error if another thread panicked while holding the lock. It is not recommended
/// to send an exclusive reference to a potentially damaged value to another thread for further
/// processing.
///
/// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut
///
/// # Examples
///
/// ```ignore
/// use hyper::common::sync_wrapper::SyncWrapper;
///
/// let mut wrapped = SyncWrapper::new(42);
/// let value = wrapped.get_mut();
/// *value = 0;
/// assert_eq!(*wrapped.get_mut(), 0);
/// ```
pub fn get_mut(&mut self) -> &mut T {
&mut self.0
}

/// Consumes this wrapper, returning the underlying data.
///
/// This is safe because it requires ownership of the wrapper, aherefore this method will neither
/// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which
/// returns an error if another thread panicked while holding the lock. It is not recommended
/// to send an exclusive reference to a potentially damaged value to another thread for further
/// processing.
///
/// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner
///
/// # Examples
///
/// ```ignore
/// use hyper::common::sync_wrapper::SyncWrapper;
///
/// let mut wrapped = SyncWrapper::new(42);
/// assert_eq!(wrapped.into_inner(), 42);
/// ```
#[allow(dead_code)]
pub fn into_inner(self) -> T {
self.0
}
}

// this is safe because the only operations permitted on this data structure require exclusive
// access or ownership
unsafe impl<T: Send> Sync for SyncWrapper<T> {}

0 comments on commit 042c770

Please sign in to comment.