Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WeakShared. #2169

Merged
merged 1 commit into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/future/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub use self::remote_handle::{Remote, RemoteHandle};
mod shared;
#[cfg(feature = "std")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::shared::Shared;
pub use self::shared::{Shared, WeakShared};

impl<T: ?Sized> FutureExt for T where T: Future {}

Expand Down
35 changes: 34 additions & 1 deletion futures-util/src/future/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};

/// Future for the [`shared`](super::FutureExt::shared) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand All @@ -26,6 +26,9 @@ struct Notifier {
wakers: Mutex<Option<Slab<Option<Waker>>>>,
}

/// A weak reference to a [`Shared`] that can be upgraded much like an `Arc`.
pub struct WeakShared<Fut: Future>(Weak<Inner<Fut>>);

// The future itself is polled behind the `Arc`, so it won't be moved
// when `Shared` is moved.
impl<Fut: Future> Unpin for Shared<Fut> {}
Expand All @@ -45,6 +48,12 @@ impl<Fut: Future> fmt::Debug for Inner<Fut> {
}
}

impl<Fut: Future> fmt::Debug for WeakShared<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakShared").finish()
}
}

enum FutureOrOutput<Fut: Future> {
Future(Fut),
Output(Fut::Output),
Expand Down Expand Up @@ -107,6 +116,16 @@ where
}
None
}

/// Creates a new [`WeakShared`] for this [`Shared`].
///
/// Returns [`None`] if it has already been polled to completion.
pub fn downgrade(&self) -> Option<WeakShared<Fut>> {
if let Some(inner) = self.inner.as_ref() {
return Some(WeakShared(Arc::downgrade(inner)));
}
None
}
}

impl<Fut> Inner<Fut>
Expand Down Expand Up @@ -314,3 +333,17 @@ impl ArcWake for Notifier {
}
}
}

impl<Fut: Future> WeakShared<Fut>
{
/// Attempts to upgrade this [`WeakShared`] into a [`Shared`].
///
/// Returns [`None`] if all clones of the [`Shared`] have been dropped or polled
/// to completion.
pub fn upgrade(&self) -> Option<Shared<Fut>> {
Some(Shared {
inner: Some(self.0.upgrade()?),
waker_key: NULL_WAKER_KEY,
})
}
}
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use self::future::CatchUnwind;
pub use self::future::{Remote, RemoteHandle};

#[cfg(feature = "std")]
pub use self::future::Shared;
pub use self::future::{Shared, WeakShared};

mod try_future;
pub use self::try_future::{
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub mod future {
#[cfg(feature = "std")]
pub use futures_util::future::{
Remote, RemoteHandle,
CatchUnwind, Shared,
CatchUnwind, Shared, WeakShared,
};
}

Expand Down
28 changes: 28 additions & 0 deletions futures/tests/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,34 @@ fn peek() {
}
}

#[test]
fn downgrade() {
use futures::channel::oneshot;
use futures::executor::block_on;
use futures::future::FutureExt;

let (tx, rx) = oneshot::channel::<i32>();
let shared = rx.shared();
// Since there are outstanding `Shared`s, we can get a `WeakShared`.
let weak = shared.downgrade().unwrap();
// It should upgrade fine right now.
let mut shared2 = weak.upgrade().unwrap();

tx.send(42).unwrap();
assert_eq!(block_on(shared).unwrap(), 42);

// We should still be able to get a new `WeakShared` and upgrade it
// because `shared2` is outstanding.
assert!(shared2.downgrade().is_some());
assert!(weak.upgrade().is_some());

assert_eq!(block_on(&mut shared2).unwrap(), 42);
// Now that all `Shared`s have been exhausted, we should not be able
// to get a new `WeakShared` or upgrade an existing one.
assert!(weak.upgrade().is_none());
assert!(shared2.downgrade().is_none());
}

#[test]
fn dont_clone_in_single_owner_shared_future() {
use futures::channel::oneshot;
Expand Down