Skip to content

Commit

Permalink
Auto merge of rust-lang#131841 - paulmenage:futex-abstraction, r=joboet
Browse files Browse the repository at this point in the history
Abstract the state type for futexes

In the same way that we expose `SmallAtomic` and `SmallPrimitive` to allow Windows to use a value other than an `AtomicU32` for its futex state, switch the primary futex state type from `AtomicU32` to `futex::Futex`.  The `futex::Futex` type should be usable as an atomic value with underlying primitive type equal to `futex::Primitive`. (`SmallAtomic` is also renamed to `SmallFutex`).

This allows supporting the futex API on systems where the underlying kernel futex implementation requires more user state than simply an `AtomicU32`.

All in-tree futex implementations simply define {`Futex`,`Primitive`} directly as {`AtomicU32`,`u32`}.
  • Loading branch information
bors committed Oct 18, 2024
2 parents eba461c + 01bce29 commit e08bce6
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 66 deletions.
9 changes: 7 additions & 2 deletions std/src/sys/pal/hermit/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ use crate::ptr::null;
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
Expand Down
9 changes: 7 additions & 2 deletions std/src/sys/pal/unix/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

/// Waits for a `futex_wake` operation to wake us.
Expand Down
9 changes: 7 additions & 2 deletions std/src/sys/pal/wasm/atomics/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ use core::arch::wasm64 as wasm;
use crate::sync::atomic::AtomicU32;
use crate::time::Duration;

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU32;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU32;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u32;

/// Wait for a futex_wake operation to wake us.
Expand Down
35 changes: 20 additions & 15 deletions std/src/sys/pal/windows/futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ use core::{mem, ptr};
use super::api::{self, WinError};
use crate::sys::{c, dur2timeout};

/// An atomic for use as a futex that is at least 32-bits but may be larger
pub type Futex = AtomicU32;
/// Must be the underlying type of Futex
pub type Primitive = u32;

/// An atomic for use as a futex that is at least 8-bits but may be larger.
pub type SmallAtomic = AtomicU8;
/// Must be the underlying type of SmallAtomic
pub type SmallFutex = AtomicU8;
/// Must be the underlying type of SmallFutex
pub type SmallPrimitive = u8;

pub unsafe trait Futex {}
pub unsafe trait Futexable {}
pub unsafe trait Waitable {
type Atomic;
type Futex;
}
macro_rules! unsafe_waitable_int {
($(($int:ty, $atomic:ty)),*$(,)?) => {
$(
unsafe impl Waitable for $int {
type Atomic = $atomic;
type Futex = $atomic;
}
unsafe impl Futex for $atomic {}
unsafe impl Futexable for $atomic {}
)*
};
}
Expand All @@ -42,15 +47,15 @@ unsafe_waitable_int! {
(usize, AtomicUsize),
}
unsafe impl<T> Waitable for *const T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Waitable for *mut T {
type Atomic = AtomicPtr<T>;
type Futex = AtomicPtr<T>;
}
unsafe impl<T> Futex for AtomicPtr<T> {}
unsafe impl<T> Futexable for AtomicPtr<T> {}

pub fn wait_on_address<W: Waitable>(
address: &W::Atomic,
address: &W::Futex,
compare: W,
timeout: Option<Duration>,
) -> bool {
Expand All @@ -63,30 +68,30 @@ pub fn wait_on_address<W: Waitable>(
}
}

pub fn wake_by_address_single<T: Futex>(address: &T) {
pub fn wake_by_address_single<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressSingle(addr);
}
}

pub fn wake_by_address_all<T: Futex>(address: &T) {
pub fn wake_by_address_all<T: Futexable>(address: &T) {
unsafe {
let addr = ptr::from_ref(address).cast::<c_void>();
c::WakeByAddressAll(addr);
}
}

pub fn futex_wait<W: Waitable>(futex: &W::Atomic, expected: W, timeout: Option<Duration>) -> bool {
pub fn futex_wait<W: Waitable>(futex: &W::Futex, expected: W, timeout: Option<Duration>) -> bool {
// return false only on timeout
wait_on_address(futex, expected, timeout) || api::get_last_error() != WinError::TIMEOUT
}

pub fn futex_wake<T: Futex>(futex: &T) -> bool {
pub fn futex_wake<T: Futexable>(futex: &T) -> bool {
wake_by_address_single(futex);
false
}

pub fn futex_wake_all<T: Futex>(futex: &T) {
pub fn futex_wake_all<T: Futexable>(futex: &T) {
wake_by_address_all(futex)
}
7 changes: 3 additions & 4 deletions std/src/sys/sync/condvar/futex.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::Relaxed;
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, futex_wait, futex_wake, futex_wake_all};
use crate::sys::sync::Mutex;
use crate::time::Duration;

pub struct Condvar {
// The value of this atomic is simply incremented on every notification.
// This is used by `.wait()` to not miss any notifications after
// unlocking the mutex and before waiting for notifications.
futex: AtomicU32,
futex: Futex,
}

impl Condvar {
#[inline]
pub const fn new() -> Self {
Self { futex: AtomicU32::new(0) }
Self { futex: Futex::new(0) }
}

// All the memory orderings here are `Relaxed`,
Expand Down
6 changes: 3 additions & 3 deletions std/src/sys/sync/mutex/futex.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{self, futex_wait, futex_wake};

type Atomic = futex::SmallAtomic;
type Futex = futex::SmallFutex;
type State = futex::SmallPrimitive;

pub struct Mutex {
futex: Atomic,
futex: Futex,
}

const UNLOCKED: State = 0;
Expand All @@ -15,7 +15,7 @@ const CONTENDED: State = 2; // locked, and other threads waiting (contended)
impl Mutex {
#[inline]
pub const fn new() -> Self {
Self { futex: Atomic::new(UNLOCKED) }
Self { futex: Futex::new(UNLOCKED) }
}

#[inline]
Expand Down
25 changes: 12 additions & 13 deletions std/src/sys/sync/once/futex.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
use crate::cell::Cell;
use crate::sync as public;
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sync::once::ExclusiveState;
use crate::sys::futex::{futex_wait, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake_all};

// On some platforms, the OS is very nice and handles the waiter queue for us.
// This means we only need one atomic value with 4 states:

/// No initialization has run yet, and no thread is currently using the Once.
const INCOMPLETE: u32 = 0;
const INCOMPLETE: Primitive = 0;
/// Some thread has previously attempted to initialize the Once, but it panicked,
/// so the Once is now poisoned. There are no other threads currently accessing
/// this Once.
const POISONED: u32 = 1;
const POISONED: Primitive = 1;
/// Some thread is currently attempting to run initialization. It may succeed,
/// so all future threads need to wait for it to finish.
const RUNNING: u32 = 2;
const RUNNING: Primitive = 2;
/// Initialization has completed and all future calls should finish immediately.
const COMPLETE: u32 = 3;
const COMPLETE: Primitive = 3;

// An additional bit indicates whether there are waiting threads:

/// May only be set if the state is not COMPLETE.
const QUEUED: u32 = 4;
const QUEUED: Primitive = 4;

// Threads wait by setting the QUEUED bit and calling `futex_wait` on the state
// variable. When the running thread finishes, it will wake all waiting threads using
// `futex_wake_all`.

const STATE_MASK: u32 = 0b11;
const STATE_MASK: Primitive = 0b11;

pub struct OnceState {
poisoned: bool,
set_state_to: Cell<u32>,
set_state_to: Cell<Primitive>,
}

impl OnceState {
Expand All @@ -49,8 +48,8 @@ impl OnceState {
}

struct CompletionGuard<'a> {
state_and_queued: &'a AtomicU32,
set_state_on_drop_to: u32,
state_and_queued: &'a Futex,
set_state_on_drop_to: Primitive,
}

impl<'a> Drop for CompletionGuard<'a> {
Expand All @@ -65,13 +64,13 @@ impl<'a> Drop for CompletionGuard<'a> {
}

pub struct Once {
state_and_queued: AtomicU32,
state_and_queued: Futex,
}

impl Once {
#[inline]
pub const fn new() -> Once {
Once { state_and_queued: AtomicU32::new(INCOMPLETE) }
Once { state_and_queued: Futex::new(INCOMPLETE) }
}

#[inline]
Expand Down
2 changes: 1 addition & 1 deletion std/src/sys/sync/once/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
// You'll find a few more details in the implementation, but that's the gist of
// it!
//
// Atomic orderings:
// Futex orderings:
// When running `Once` we deal with multiple atomics:
// `Once.state_and_queue` and an unknown number of `Waiter.signaled`.
// * `state_and_queue` is used (1) as a state flag, (2) for synchronizing the
Expand Down
41 changes: 20 additions & 21 deletions std/src/sys/sync/rwlock/futex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::sync::atomic::AtomicU32;
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all};
use crate::sys::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};

pub struct RwLock {
// The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
Expand All @@ -10,41 +9,41 @@ pub struct RwLock {
// 0x3FFF_FFFF: Write locked
// Bit 30: Readers are waiting on this futex.
// Bit 31: Writers are waiting on the writer_notify futex.
state: AtomicU32,
state: Futex,
// The 'condition variable' to notify writers through.
// Incremented on every signal.
writer_notify: AtomicU32,
writer_notify: Futex,
}

const READ_LOCKED: u32 = 1;
const MASK: u32 = (1 << 30) - 1;
const WRITE_LOCKED: u32 = MASK;
const MAX_READERS: u32 = MASK - 1;
const READERS_WAITING: u32 = 1 << 30;
const WRITERS_WAITING: u32 = 1 << 31;
const READ_LOCKED: Primitive = 1;
const MASK: Primitive = (1 << 30) - 1;
const WRITE_LOCKED: Primitive = MASK;
const MAX_READERS: Primitive = MASK - 1;
const READERS_WAITING: Primitive = 1 << 30;
const WRITERS_WAITING: Primitive = 1 << 31;

#[inline]
fn is_unlocked(state: u32) -> bool {
fn is_unlocked(state: Primitive) -> bool {
state & MASK == 0
}

#[inline]
fn is_write_locked(state: u32) -> bool {
fn is_write_locked(state: Primitive) -> bool {
state & MASK == WRITE_LOCKED
}

#[inline]
fn has_readers_waiting(state: u32) -> bool {
fn has_readers_waiting(state: Primitive) -> bool {
state & READERS_WAITING != 0
}

#[inline]
fn has_writers_waiting(state: u32) -> bool {
fn has_writers_waiting(state: Primitive) -> bool {
state & WRITERS_WAITING != 0
}

#[inline]
fn is_read_lockable(state: u32) -> bool {
fn is_read_lockable(state: Primitive) -> bool {
// This also returns false if the counter could overflow if we tried to read lock it.
//
// We don't allow read-locking if there's readers waiting, even if the lock is unlocked
Expand All @@ -55,14 +54,14 @@ fn is_read_lockable(state: u32) -> bool {
}

#[inline]
fn has_reached_max_readers(state: u32) -> bool {
fn has_reached_max_readers(state: Primitive) -> bool {
state & MASK == MAX_READERS
}

impl RwLock {
#[inline]
pub const fn new() -> Self {
Self { state: AtomicU32::new(0), writer_notify: AtomicU32::new(0) }
Self { state: Futex::new(0), writer_notify: Futex::new(0) }
}

#[inline]
Expand Down Expand Up @@ -225,7 +224,7 @@ impl RwLock {
/// If both are waiting, this will wake up only one writer, but will fall
/// back to waking up readers if there was no writer to wake up.
#[cold]
fn wake_writer_or_readers(&self, mut state: u32) {
fn wake_writer_or_readers(&self, mut state: Primitive) {
assert!(is_unlocked(state));

// The readers waiting bit might be turned on at any point now,
Expand Down Expand Up @@ -290,7 +289,7 @@ impl RwLock {

/// Spin for a while, but stop directly at the given condition.
#[inline]
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
let mut spin = 100; // Chosen by fair dice roll.
loop {
let state = self.state.load(Relaxed);
Expand All @@ -303,13 +302,13 @@ impl RwLock {
}

#[inline]
fn spin_write(&self) -> u32 {
fn spin_write(&self) -> Primitive {
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
}

#[inline]
fn spin_read(&self) -> u32 {
fn spin_read(&self) -> Primitive {
// Stop spinning when it's unlocked or read locked, or when there's waiting threads.
self.spin_until(|state| {
!is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
Expand Down
Loading

0 comments on commit e08bce6

Please sign in to comment.