From 042c770603a212f22387807efe4fc672959df40c Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 19 Apr 2020 18:32:17 +0200 Subject: [PATCH] feat(body): remove Sync bound for Body::wrap_stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A stream wrapped into a Body previously needed to implement `Sync` so that the Body type implements this autotrait as well (which is needed due to limitations in async/await). Since a stream only offers one method that is called with an exclusive reference, this type is statically proven to be Sync already. In theory it should be fine to add an `unsafe impl Sync`, but this commit instead adds a SyncWrapper to enlist the compiler’s help in proving that this is (and remains) correct. This makes it easier to construct response bodies for client code. --- src/body/body.rs | 25 ++++---- src/common/mod.rs | 1 + src/common/sync_wrapper.rs | 115 +++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 15 deletions(-) create mode 100644 src/common/sync_wrapper.rs diff --git a/src/body/body.rs b/src/body/body.rs index 0c75f71991..355f0d8713 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -11,6 +11,7 @@ use futures_util::TryStreamExt; use http::HeaderMap; use http_body::{Body as HttpBody, SizeHint}; +use crate::common::sync_wrapper::SyncWrapper; use crate::common::{task, watch, Future, Never, Pin, Poll}; use crate::proto::h2::ping; use crate::proto::DecodedLength; @@ -42,13 +43,11 @@ enum Kind { content_length: DecodedLength, recv: h2::RecvStream, }, - // NOTE: This requires `Sync` because of how easy it is to use `await` - // while a borrow of a `Request` exists. - // - // See https://github.com/rust-lang/rust/issues/57017 #[cfg(feature = "stream")] Wrapped( - Pin>> + Send + Sync>>, + SyncWrapper< + Pin>> + Send>>, + >, ), } @@ -156,12 +155,12 @@ impl Body { #[cfg(feature = "stream")] pub fn wrap_stream(stream: S) -> Body where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + 'static, O: Into + 'static, E: Into> + 'static, { let mapped = stream.map_ok(Into::into).map_err(Into::into); - Body::new(Kind::Wrapped(Box::pin(mapped))) + Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) } /// Converts this `Body` into a `Future` of a pending HTTP upgrade. @@ -280,7 +279,7 @@ impl Body { }, #[cfg(feature = "stream")] - Kind::Wrapped(ref mut s) => match ready!(s.as_mut().poll_next(cx)) { + Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), None => Poll::Ready(None), }, @@ -402,16 +401,12 @@ impl Stream for Body { /// This function requires enabling the `stream` feature in your /// `Cargo.toml`. #[cfg(feature = "stream")] -impl From>> + Send + Sync>> - for Body -{ +impl From>> + Send>> for Body { #[inline] fn from( - stream: Box< - dyn Stream>> + Send + Sync, - >, + stream: Box>> + Send>, ) -> Body { - Body::new(Kind::Wrapped(stream.into())) + Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) } } diff --git a/src/common/mod.rs b/src/common/mod.rs index e436fe5e2d..d9d62bc2ba 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod exec; pub(crate) mod io; mod lazy; mod never; +pub(crate) mod sync_wrapper; pub(crate) mod task; pub(crate) mod watch; diff --git a/src/common/sync_wrapper.rs b/src/common/sync_wrapper.rs new file mode 100644 index 0000000000..1e4aa4039c --- /dev/null +++ b/src/common/sync_wrapper.rs @@ -0,0 +1,115 @@ +/* + * This is a copy of the sync_wrapper crate. + */ +//! A mutual exclusion primitive that relies on static type information only +//! +//! This library is inspired by [this discussion](https://internals.rust-lang.org/t/what-shall-sync-mean-across-an-await/12020/2). +#![doc(html_logo_url = "https://developer.actyx.com/img/logo.svg")] +#![doc(html_favicon_url = "https://developer.actyx.com/img/favicon.ico")] + +/// A mutual exclusion primitive that relies on static type information only +/// +/// In some cases synchronization can be proven statically: whenever you hold an exclusive `&mut` +/// reference, the Rust type system ensures that no other part of the program can hold another +/// reference to the data. Therefore it is safe to access it even if the current thread obtained +/// this reference via a channel. Whenever this is the case, the overhead of allocating and locking +/// a [`Mutex`] can be avoided by using this static version. +/// +/// One example where this is often applicable is [`Future`], which requires an exclusive reference +/// for its [`poll`] method: While a given `Future` implementation may not be safe to access by +/// multiple threads concurrently, the executor can only run the `Future` on one thread at any +/// given time, making it [`Sync`] in practice as long as the implementation is `Send`. You can +/// therefore use the sync wrapper to prove that your data structure is `Sync` even though it +/// contains such a `Future`. +/// +/// # Example +/// +/// ```ignore +/// use hyper::common::sync_wrapper::SyncWrapper; +/// use std::future::Future; +/// +/// struct MyThing { +/// future: SyncWrapper + Send>>, +/// } +/// +/// impl MyThing { +/// // all accesses to `self.future` now require an exclusive reference or ownership +/// } +/// +/// fn assert_sync() {} +/// +/// assert_sync::(); +/// ``` +/// +/// [`Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html +/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html +/// [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#method.poll +/// [`Sync`]: https://doc.rust-lang.org/std/marker/trait.Sync.html +#[repr(transparent)] +pub struct SyncWrapper(T); + +impl SyncWrapper { + /// Creates a new SyncWrapper containing the given value. + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let wrapped = SyncWrapper::new(42); + /// ``` + pub fn new(value: T) -> Self { + Self(value) + } + + /// Acquires a reference to the protected value. + /// + /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method + /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// let value = wrapped.get_mut(); + /// *value = 0; + /// assert_eq!(*wrapped.get_mut(), 0); + /// ``` + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes this wrapper, returning the underlying data. + /// + /// This is safe because it requires ownership of the wrapper, aherefore this method will neither + /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// assert_eq!(wrapped.into_inner(), 42); + /// ``` + #[allow(dead_code)] + pub fn into_inner(self) -> T { + self.0 + } +} + +// this is safe because the only operations permitted on this data structure require exclusive +// access or ownership +unsafe impl Sync for SyncWrapper {}