Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove argument from closure in thread::Scope::spawn. #94559

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions library/core/src/sync/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ impl AtomicBool {
/// let a = &*AtomicBool::from_mut_slice(&mut some_bools);
/// std::thread::scope(|s| {
/// for i in 0..a.len() {
/// s.spawn(move |_| a[i].store(true, Ordering::Relaxed));
/// s.spawn(move || a[i].store(true, Ordering::Relaxed));
/// }
/// });
/// assert_eq!(some_bools, [true; 10]);
Expand Down Expand Up @@ -984,7 +984,7 @@ impl<T> AtomicPtr<T> {
/// let a = &*AtomicPtr::from_mut_slice(&mut some_ptrs);
/// std::thread::scope(|s| {
/// for i in 0..a.len() {
/// s.spawn(move |_| {
/// s.spawn(move || {
/// let name = Box::new(format!("thread{i}"));
/// a[i].store(Box::into_raw(name), Ordering::Relaxed);
/// });
Expand Down Expand Up @@ -1533,7 +1533,7 @@ macro_rules! atomic_int {
#[doc = concat!("let a = &*", stringify!($atomic_type), "::from_mut_slice(&mut some_ints);")]
/// std::thread::scope(|s| {
/// for i in 0..a.len() {
/// s.spawn(move |_| a[i].store(i as _, Ordering::Relaxed));
/// s.spawn(move || a[i].store(i as _, Ordering::Relaxed));
/// }
/// });
/// for (i, n) in some_ints.into_iter().enumerate() {
Expand Down
44 changes: 23 additions & 21 deletions library/std/src/thread/scoped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,24 @@ use crate::sync::Arc;
/// A scope to spawn scoped threads in.
///
/// See [`scope`] for details.
pub struct Scope<'env> {
pub struct Scope<'scope, 'env: 'scope> {
data: ScopeData,
/// Invariance over 'env, to make sure 'env cannot shrink,
/// Invariance over 'scope, to make sure 'scope cannot shrink,
/// which is necessary for soundness.
///
/// Without invariance, this would compile fine but be unsound:
///
/// ```compile_fail
/// ```compile_fail,E0373
/// #![feature(scoped_threads)]
///
/// std::thread::scope(|s| {
/// s.spawn(|s| {
/// s.spawn(|| {
/// let a = String::from("abcd");
/// s.spawn(|_| println!("{:?}", a)); // might run after `a` is dropped
/// s.spawn(|| println!("{:?}", a)); // might run after `a` is dropped
/// });
/// });
/// ```
scope: PhantomData<&'scope mut &'scope ()>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see the documentation expanded here as to the meaning of 'scope and 'env, likely drawing from the commentary on #93203 (comment) and this PR.

Part of me wants to say that we should also add some kind of user-visible note about 'scope/'env -- it seems like this is an API that we would like new users to Rust to go "cool!" about, and I think the current signature will make them afraid that all work in Rust with threads requires complicated lifetime signatures that look pretty opaque -- even to experienced Rustaceans, I suspect :) I'm not sure if putting the note on this type and then linking to it from the other APIs which use Scope is right -- that would be my first instinct, but the more complicated signature is on those APIs (with for<'scope>... and all).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we probably should, but I'm not sure how yet. I've added it to the tracking issue.

Copy link
Contributor

@danielhenrymantilla danielhenrymantilla Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I have an idea of renaming stuff that could improve stuff, but I was gonna officially suggest it in a follow-up PR, but the gist of it would be:

Current Suggested Notes
thread::scope() thread::scoped()
or
thread::with_scope()
debatable nit
'env 'scope I don't feel strongly about this one either
'scope 'spawner but I do for this one
Scope ScopedSpawner
or just
Spawner
and thus for this one as well
fn scoped<'scope, R> (
    // `'spawner` is a higher-order lifetime parameter, thus it may not escape the scope
    // of the closure's body: the spawner is effectively "caged" within this function call.
    f: impl for<'spawner> FnOnce(&'spawner ScopedSpawner<'spawner, 'scope>) -> R
) -> R
impl<'spawner, '_> ScopedSpawner<'spawner, '_> {
    fn spawn<R, ThreadBody>(
        &'spawner self,
        thread_body: ThreadBody,
    ) -> JoinHandle<'spawner, T>
    where
        // ThreadBody represents logic / a computation that produces some `R`.
        ThreadBody : FnOnce() -> R,
        // Such computation may (and will!) be run in another thread...
        ThreadBody : Send,
        // ... and thus it may be used / be running for as long as the `ScopedSpawner` is.
        ThreadBody : 'spawner,
        // The computed value needs to be able to come back to our thread.
        R : Send,
        // And its implicit drop glue may act as a computation of its own, that may thus
        // be run while a `ScopedSpawner` is around.
        R : 'spawner,

Funnily enough, by phrasing the F : 'spawner bound as the straightforward "it may be used / may be running while the ScopedSpawner is alive"1, it turns out we don't need to think about the universal quantification of 'scope : 'env and all those more complicated considerations. We just have a thread that has to be able to be run(ning) / used while the ScopedSpawner, the handle that ensures the thread is joined, is itself dropped.

It is really the main key point.

Then, and only then, you may consider going further in the reasoning:

We can see that:

impl<'spawner> ScopedSpawner<'spawner, '_> {
    fn spawn(
       self: &'_ Self,
       thread: impl 'spawner + Send + FnOnce()...,

is not that different from the more natural:

impl<'spawner> ScopedSpawner<'spawner> {
    fn spawn(
        self: &'_ Self,
        thread: impl 'spawner + Send + FnOnce()...,
Why "natural"

Indeed, if we stare at the latter, it's actually the pre-leakpocalypse API.

And the bound / logic itself, modulo Send, is actually the same as imagining a collection of drop glues / of defers, since joining a thread on drop is not that different from running that thread's logic directly on drop:

struct MultiDefer<'multi_defer> /* = */ (
    Vec<Box<dyn 'multi_defer+ FnOnce()>>,
);
impl Drop for MultiDefer<'_> { fn drop(&mut self) {
    for f in mem::take(self.0) { f(); }
}}

/// Look, same bounds as the pre-leakpocalypse API (but for Send)
impl<'multi_defer> MultiDefer<'multi_defer> {
    fn add(&mut self, f: impl 'multi_defer + FnOnce())
    {
       self.0.push(Box::new(f))

We "just" have to add that extra "upper bound" on 'spawner for convenience, since a post-leakpocalypse API needs a higher-order lifetime, and a higher-order lifetime, by default, not only expresses a local/caged lifetime, but it also covers the range of 'static or other big lifetimes (our spawner won't be living for that long!). Hence the need to opt-out of that "range of big lifetimes", and use a for<'spawner where 'spawner : 'scope> quantification. That where clause can't be written there and then in current Rust, so it is written inside ScopedSpawner's definition, which thus needs to carry that otherwise useless extra lifetime parameter.

Footnotes

  1. which is actually almost exactly what happens –just a bit too conservative, since there is a tiny instant between the moment where all the threads are joined and their non-collected computed values are dropped and the moment the ScopedSpawner itself finally dies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should rename Scope to Spawner, because I imagine Scope might be able to do more than just spawning threads in the future. For example, I can imagine something like scope.is_panicked() to check if any of the threads panicked. Or scope.num_running_threads() to see how many threads are currently still running in this scope. Etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But let's have that discussion on the tracking issue. ^^

env: PhantomData<&'env mut &'env ()>,
}

Expand Down Expand Up @@ -88,12 +89,12 @@ impl ScopeData {
/// let mut x = 0;
///
/// thread::scope(|s| {
/// s.spawn(|_| {
/// s.spawn(|| {
/// println!("hello from the first scoped thread");
/// // We can borrow `a` here.
/// dbg!(&a);
/// });
/// s.spawn(|_| {
/// s.spawn(|| {
/// println!("hello from the second scoped thread");
/// // We can even mutably borrow `x` here,
/// // because no other threads are using it.
Expand All @@ -109,7 +110,7 @@ impl ScopeData {
#[track_caller]
pub fn scope<'env, F, T>(f: F) -> T
where
F: FnOnce(&Scope<'env>) -> T,
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
{
let scope = Scope {
data: ScopeData {
Expand All @@ -118,6 +119,7 @@ where
a_thread_panicked: AtomicBool::new(false),
},
env: PhantomData,
scope: PhantomData,
};

// Run `f`, but catch panics so we can make sure to wait for all the threads to join.
Expand All @@ -138,7 +140,7 @@ where
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but it seems worrying that if park() panics we get unsoundness -- it seems like we should be using a drop guard or similar rather than catch_unwind?

(On non-futex or windows platforms, the generic parker implementation has some clear panic points, though they should not happen in theory).

But maybe that kind of defensiveness isn't worthwhile -- anyway, just a passing thought as I wrap my head around the impl here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just change those implementations and guarantee that park() never panics.

}

impl<'env> Scope<'env> {
impl<'scope, 'env> Scope<'scope, 'env> {
/// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
///
/// Unlike non-scoped threads, threads spawned with this function may
Expand All @@ -163,10 +165,10 @@ impl<'env> Scope<'env> {
/// to recover from such errors.
///
/// [`join`]: ScopedJoinHandle::join
pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce(&Scope<'env>) -> T + Send + 'env,
T: Send + 'env,
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
Copy link
Contributor

@danielhenrymantilla danielhenrymantilla Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be needed for the return value to be allowed to be as small as 'scope

Suggested change
T: Send + 'scope,
T: Send + 'env,

mainly, I'd find it less problematic if s.spawn(|| s) were denied.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.spawn(|| s) seems to be denied in both cases.

Do you have an example of any difference that this change makes?

Copy link
Contributor

@danielhenrymantilla danielhenrymantilla Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that was unexpected; it looks like the edition-2021 magic fails when the capture is used "by value" but with fully inferred types (the *&& -> & place flattening does not seem to occur if the *&& place starts off inferred). This includes a move in return position.

Thus, using move || s or || -> &_ { s } are both examples that do pass when T is only restricted to be : 'scope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a problem if s.spawn(move || s) works? I can't really think of a good use case, but I'd prefer to not unnecessarily add more restrictions to the return type than necessary. I also think the signature is easier to follow if every lifetime is 'scope without having to wonder why F is 'scope but T is 'env.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 It all boils down to what does the "scoped runtime" do with the returned value when not joined / and potentially when leaked —I suspect the latter is the only actual area of problems, and the current implementation leaks stuff in that case, so at first glance it looks like it's gonna be fine.

  • If s can be returned, then scopeguard::guard(s, |s| s.spawn(…)) could also be the value of type T, and then it could be problematic to have such drop logic run within the runtime's auto-join cleanup etc.. Basically by using 'scope we are giving more flexibility to the caller, who oughtn't realistically need it, and which could impair a bit future changes to the implementation so as not to cause unsoundness within that contrived use case.

  • So, conservatively, using 'env is the more cautious and future-proof approach; we could always loosen it afterwards. But I agree I haven't delved too much into "all potential cleanup implementations", and that it is thus also possible that there be no problem whatsoever with that extra flexibility.

To recap:

  • there could be future-proofing, soundness-wise, issues with the more lenient : 'scope bound (but it may also be fine);
  • the signature is a bit more complex with F : 'scope on one end, and T : 'env on the other.

So, one thing that can be done is sticking to T : 'scope for now, but adding a mention to this to the tracking issue so that we remember to think about it before stabilization 🙂

Copy link
Member Author

@m-ou-se m-ou-se Mar 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some more time thinking about this, and concluded that using 'scope is actually unsound with the current implementation, since the active thread counter is decremented before dropping an ignored return value:

#![feature(scoped_threads)]

use std::{
    thread::{scope, sleep},
    time::Duration,
};

fn main() {
    let mut a = "asdf".to_string();
    let r = &a;
    scope(|s| {
        s.spawn(move || {
            scopeguard::guard(s, move |s| {
                sleep(Duration::from_millis(100));
                s.spawn(move || dbg!(r));
            })
        });
    });
    drop(a);
    sleep(Duration::from_millis(200));
}

With std from this PR:

[src/main.rs:15] r = "�K\u{f}\\"

Eep!

However! After thinking a bit more, I concluded that using 'env doesn't fix things. It is still unsound with 'env, and in fact even the code before the change in this PR is unsound:

fn main() {
    let mut a = "asdf".to_string();
    let r = &a;
    scope(|s| {
        s.spawn(move || {
            scopeguard::guard(r, move |r| {
                sleep(Duration::from_millis(100));
                dbg!(r);
            })
        });
    });
    drop(a);
    sleep(Duration::from_millis(200));
}

On Rust rustc 1.61.0-nightly (8769f4e 2022-03-02):

[src/main.rs:15] r = "!\u{1a}\u{7f}c"

This makes me think that it's unlikely that an implementation would be unsound with the 'scope bound, but not with the 'env bound, since it needs to handle this last example anyway.

(The fix is to drop the return value before marking a thread as joined. So, an implementation issue, not an API issue. I'll send a PR for that.)

Copy link
Contributor

@danielhenrymantilla danielhenrymantilla Mar 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! 🙂


So, I've been thinking about this, to fully spell out what my hunch / feeling of uneasyness was.

Let's start with the following new names, for the remainder of this post:

  • : 'env is : CanReferToSuffOutsideTheScopeButNotTheSpawner:
    • renamed 'cannot_capture_inside_scope in what follows
  • 'scope is CanAlsoSpawnScopedThreads
    • renamed 'cannot_capture_inside_scope_except_spawner in what follows (together with Scope becoming ScopedSpawner).

So from a start point of F and T both being CanReferToSuffOutsideTheScopeButNotTheSpawner, we loosen it down to CanAlsoSpawnScopedThreads capability to F, since we are removing the raw spawner handle it used to receive as a parameter.

- F : 'cannot_capture_inside_scope + FnOnce(ScopedSpawner...) -> T,
+ F : 'cannot_capture_inside_scope_except_spawner + FnOnce() -> T, // equivalent API
  • (This change is actually a very good summary of this whole PR, btw 😄)

Now, you are also giving the CanAlsoSpawnScopedThreads capability to the return value of the closure.

- T : 'cannot_capture_inside_scope,
+ T : 'cannot_capture_inside_scope_except_spawner, // new capability
  • If such value is obtained through an explicit .join(), then that capability becomes pointless since the spawner handle itself is also available.

  • Else, the return value is no longer accessible to the user, and thus its sole API being called, if any, will be its drop glue.

    • If the value is leaked, then such capability becomes pointless as well, since not called.
    • Else, the value is dropped around the join-point (but before it, once your PR fixing the soundness bug is merged).

So what granting it the CanAlsoSpawnScopedThreads capability achieves is allowing this implicit drop, right before the join point cleanup, to spawn extra scoped threads, leading to thread count changes etc.

  • This is an extra feature of the API, so it can be beneficial if we deem it to be a useful feature to have;

  • It's also a maintenance burden on the scoped threads API, since any time the join logic is touched, implementors will need to be mindful of that rough edge:

    impl<'spawner, T> Drop for Packet<'spawner, T> {
        fn drop(&mut self) {
            // ...
            {
                // Drop the closure's return value: THIS MAY SPAWN EXTRA THREADS
                *self.result.get_mut() = None;
            // ...
            self.spawner.decrement_num_running_threads(...)

With the caveat that mistakes here endanger soundness, and that removing this CanAlsoSpawnScopedThreads capability would be a breaking change.


  • So, it's not, granted, that much of extra cognitive burden when dealing with the implementation, but it is some.
  • However, the obtained flexibility does feel incredibly niche, imho (moreover, even if I were wrong and that capability were useful, then we could always add it later on: it wouldn't be a breaking change).
  • Remains the argument of "symmetry in the lifetime bounds, for a simpler function signature", which to me is outweighed by the "caution, extra threads may be spawned during drop glue" subtlety for implementors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Drop the closure's return value: THIS MAY SPAWN EXTRA THREADS

I wonder if there's any reasonable implementation where this requires special attention. The point I was trying to make towards the end of my last comment is that implementations need to take care of handling Drop implementations that borrow from 'env anyway, which means the drop needs to happen before any kind of 'joining' happens, which also covers the case where someone spawns more threads from the Drop impl.

I don't think there is really any difference between spawning threads near the end of F, versus doing things in Ts Drop impl, from the perspective of what needs to be taken into account for soundness/correctness. In both cases it's just a thread spawning more threads before it gets joined.

Copy link
Contributor

@danielhenrymantilla danielhenrymantilla Mar 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I was trying to make

Yeah I guess I didn't get it 100% the first time 😅; I remained convinced it would lead to a more subtle approach, but as you've patiently mentioned, the subtlety is already there even with 'env, as in "the true F is actually F + drop(T)".

Thanks for having put up with my initial skepticism! 🙏

Silly GH won't let me mark my own remark as resolved 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the true F is actually F + drop(T)"

Yeah, that's a great way of putting it.

{
Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
}
Expand Down Expand Up @@ -196,7 +198,7 @@ impl Builder {
/// thread::scope(|s| {
/// thread::Builder::new()
/// .name("first".to_string())
/// .spawn_scoped(s, |_|
/// .spawn_scoped(s, ||
/// {
/// println!("hello from the {:?} scoped thread", thread::current().name());
/// // We can borrow `a` here.
Expand All @@ -205,7 +207,7 @@ impl Builder {
/// .unwrap();
/// thread::Builder::new()
/// .name("second".to_string())
/// .spawn_scoped(s, |_|
/// .spawn_scoped(s, ||
/// {
/// println!("hello from the {:?} scoped thread", thread::current().name());
/// // We can even mutably borrow `x` here,
Expand All @@ -222,14 +224,14 @@ impl Builder {
/// ```
pub fn spawn_scoped<'scope, 'env, F, T>(
self,
scope: &'scope Scope<'env>,
scope: &'scope Scope<'scope, 'env>,
f: F,
) -> io::Result<ScopedJoinHandle<'scope, T>>
where
F: FnOnce(&Scope<'env>) -> T + Send + 'env,
T: Send + 'env,
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(|| f(scope), Some(&scope.data)) }?))
Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(&scope.data)) }?))
}
}

Expand All @@ -245,7 +247,7 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> {
/// use std::thread;
///
/// thread::scope(|s| {
/// let t = s.spawn(|_| {
/// let t = s.spawn(|| {
/// println!("hello");
/// });
/// println!("thread id: {:?}", t.thread().id());
Expand Down Expand Up @@ -279,7 +281,7 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> {
/// use std::thread;
///
/// thread::scope(|s| {
/// let t = s.spawn(|_| {
/// let t = s.spawn(|| {
/// panic!("oh no");
/// });
/// assert!(t.join().is_err());
Expand All @@ -299,7 +301,7 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> {
}
}

impl<'env> fmt::Debug for Scope<'env> {
impl fmt::Debug for Scope<'_, '_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Scope")
.field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
Expand Down