Skip to content

Commit

Permalink
Rollup merge of rust-lang#132730 - joboet:after_main_sync, r=Noratrieb
Browse files Browse the repository at this point in the history
std: allow after-main use of synchronization primitives

By creating an unnamed thread handle when the actual one has already been destroyed, synchronization primitives using thread parking can be used even outside the Rust runtime.

This also fixes an inefficiency in the queue-based `RwLock`: if `thread::current` was not initialized yet, it will create a new handle on every parking attempt without initializing `thread::current`. The private `current_or_unnamed` function introduced here fixes this.
  • Loading branch information
jieyouxu authored Nov 24, 2024
2 parents 6ffa455 + 05fecb9 commit 895f290
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 26 deletions.
3 changes: 0 additions & 3 deletions std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@
//!
//! - after-main use of thread-locals, which also affects additional features:
//! - [`thread::current()`]
//! - [`thread::scope()`]
//! - [`sync::mpmc`]
//! - [`sync::mpsc`]
//! - before-main stdio file descriptors are not guaranteed to be open on unix platforms
//!
//!
Expand Down
6 changes: 4 additions & 2 deletions std/src/sync/mpmc/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ impl<T> Channel<T> {
}

// Block the current thread.
let sel = cx.wait_until(deadline);
// SAFETY: the context belongs to the current thread.
let sel = unsafe { cx.wait_until(deadline) };

match sel {
Selected::Waiting => unreachable!(),
Expand Down Expand Up @@ -397,7 +398,8 @@ impl<T> Channel<T> {
}

// Block the current thread.
let sel = cx.wait_until(deadline);
// SAFETY: the context belongs to the current thread.
let sel = unsafe { cx.wait_until(deadline) };

match sel {
Selected::Waiting => unreachable!(),
Expand Down
13 changes: 9 additions & 4 deletions std/src/sync/mpmc/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
packet: AtomicPtr::new(ptr::null_mut()),
thread: thread::current(),
thread: thread::current_or_unnamed(),
thread_id: current_thread_id(),
}),
}
Expand Down Expand Up @@ -112,8 +112,11 @@ impl Context {
/// Waits until an operation is selected and returns it.
///
/// If the deadline is reached, `Selected::Aborted` will be selected.
///
/// # Safety
/// This may only be called from the thread this `Context` belongs to.
#[inline]
pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
pub unsafe fn wait_until(&self, deadline: Option<Instant>) -> Selected {
loop {
// Check whether an operation has been selected.
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
Expand All @@ -126,7 +129,8 @@ impl Context {
let now = Instant::now();

if now < end {
thread::park_timeout(end - now);
// SAFETY: guaranteed by caller.
unsafe { self.inner.thread.park_timeout(end - now) };
} else {
// The deadline has been reached. Try aborting select.
return match self.try_select(Selected::Aborted) {
Expand All @@ -135,7 +139,8 @@ impl Context {
};
}
} else {
thread::park();
// SAFETY: guaranteed by caller.
unsafe { self.inner.thread.park() };
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion std/src/sync/mpmc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ impl<T> Channel<T> {
}

// Block the current thread.
let sel = cx.wait_until(deadline);
// SAFETY: the context belongs to the current thread.
let sel = unsafe { cx.wait_until(deadline) };

match sel {
Selected::Waiting => unreachable!(),
Expand Down
6 changes: 4 additions & 2 deletions std/src/sync/mpmc/zero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ impl<T> Channel<T> {
drop(inner);

// Block the current thread.
let sel = cx.wait_until(deadline);
// SAFETY: the context belongs to the current thread.
let sel = unsafe { cx.wait_until(deadline) };

match sel {
Selected::Waiting => unreachable!(),
Expand Down Expand Up @@ -257,7 +258,8 @@ impl<T> Channel<T> {
drop(inner);

// Block the current thread.
let sel = cx.wait_until(deadline);
// SAFETY: the context belongs to the current thread.
let sel = unsafe { cx.wait_until(deadline) };

match sel {
Selected::Waiting => unreachable!(),
Expand Down
9 changes: 5 additions & 4 deletions std/src/sys/sync/once/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ const QUEUE_MASK: usize = !STATE_MASK;
// use interior mutability.
#[repr(align(4))] // Ensure the two lower bits are free to use as state bits.
struct Waiter {
thread: Cell<Option<Thread>>,
thread: Thread,
signaled: AtomicBool,
next: Cell<*const Waiter>,
}
Expand Down Expand Up @@ -238,7 +238,7 @@ fn wait(
return_on_poisoned: bool,
) -> StateAndQueue {
let node = &Waiter {
thread: Cell::new(Some(thread::current())),
thread: thread::current_or_unnamed(),
signaled: AtomicBool::new(false),
next: Cell::new(ptr::null()),
};
Expand Down Expand Up @@ -277,7 +277,8 @@ fn wait(
// can park ourselves, the result could be this thread never gets
// unparked. Luckily `park` comes with the guarantee that if it got
// an `unpark` just before on an unparked thread it does not park.
thread::park();
// SAFETY: we retrieved this handle on the current thread above.
unsafe { node.thread.park() }
}

return state_and_queue.load(Acquire);
Expand Down Expand Up @@ -309,7 +310,7 @@ impl Drop for WaiterQueue<'_> {
let mut queue = to_queue(current);
while !queue.is_null() {
let next = (*queue).next.get();
let thread = (*queue).thread.take().unwrap();
let thread = (*queue).thread.clone();
(*queue).signaled.store(true, Release);
thread.unpark();
queue = next;
Expand Down
6 changes: 2 additions & 4 deletions std/src/sys/sync/rwlock/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ use crate::mem;
use crate::ptr::{self, NonNull, null_mut, without_provenance_mut};
use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use crate::sync::atomic::{AtomicBool, AtomicPtr};
use crate::thread::{self, Thread, ThreadId};
use crate::thread::{self, Thread};

/// The atomic lock state.
type AtomicState = AtomicPtr<()>;
Expand Down Expand Up @@ -217,9 +217,7 @@ impl Node {
/// Prepare this node for waiting.
fn prepare(&mut self) {
// Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors.
self.thread.get_or_init(|| {
thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new()))
});
self.thread.get_or_init(thread::current_or_unnamed);
self.completed = AtomicBool::new(false);
}

Expand Down
17 changes: 17 additions & 0 deletions std/src/thread/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ pub(crate) fn try_current() -> Option<Thread> {
}
}

/// Gets a handle to the thread that invokes it. If the handle stored in thread-
/// local storage was already destroyed, this creates a new unnamed temporary
/// handle to allow thread parking in nearly all situations.
pub(crate) fn current_or_unnamed() -> Thread {
let current = CURRENT.get();
if current > DESTROYED {
unsafe {
let current = ManuallyDrop::new(Thread::from_raw(current));
(*current).clone()
}
} else if current == DESTROYED {
Thread::new_unnamed(id::get_or_init())
} else {
init_current(current)
}
}

/// Gets a handle to the thread that invokes it.
///
/// # Examples
Expand Down
15 changes: 12 additions & 3 deletions std/src/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ mod current;

#[stable(feature = "rust1", since = "1.0.0")]
pub use current::current;
pub(crate) use current::{current_id, drop_current, set_current, try_current};
pub(crate) use current::{current_id, current_or_unnamed, drop_current, set_current, try_current};

mod spawnhook;

Expand Down Expand Up @@ -1146,9 +1146,9 @@ pub fn park_timeout_ms(ms: u32) {
#[stable(feature = "park_timeout", since = "1.4.0")]
pub fn park_timeout(dur: Duration) {
let guard = PanicGuard;
// SAFETY: park_timeout is called on the parker owned by this thread.
// SAFETY: park_timeout is called on a handle owned by this thread.
unsafe {
current().0.parker().park_timeout(dur);
current().park_timeout(dur);
}
// No panic occurred, do not abort.
forget(guard);
Expand Down Expand Up @@ -1446,6 +1446,15 @@ impl Thread {
unsafe { self.0.parker().park() }
}

/// Like the public [`park_timeout`], but callable on any handle. This is
/// used to allow parking in TLS destructors.
///
/// # Safety
/// May only be called from the thread to which this handle belongs.
pub(crate) unsafe fn park_timeout(&self, dur: Duration) {
unsafe { self.0.parker().park_timeout(dur) }
}

/// Atomically makes the handle's token available if it is not already.
///
/// Every thread is equipped with some basic low-level blocking support, via
Expand Down
7 changes: 4 additions & 3 deletions std/src/thread/scoped.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Builder, JoinInner, Result, Thread, current, park};
use super::{Builder, JoinInner, Result, Thread, current_or_unnamed};
use crate::marker::PhantomData;
use crate::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
use crate::sync::Arc;
Expand Down Expand Up @@ -140,7 +140,7 @@ where
let scope = Scope {
data: Arc::new(ScopeData {
num_running_threads: AtomicUsize::new(0),
main_thread: current(),
main_thread: current_or_unnamed(),
a_thread_panicked: AtomicBool::new(false),
}),
env: PhantomData,
Expand All @@ -152,7 +152,8 @@ where

// Wait until all the threads are finished.
while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
park();
// SAFETY: this is the main thread, the handle belongs to us.
unsafe { scope.data.main_thread.park() };
}

// Throw any panic from `f`, or the return value of `f` if no thread panicked.
Expand Down

0 comments on commit 895f290

Please sign in to comment.