From 8fa2fd35fd8c74c7e959744b1ac47987cdd47f53 Mon Sep 17 00:00:00 2001 From: Cormac Relf Date: Fri, 1 Nov 2019 22:31:39 +1100 Subject: [PATCH] wasm implementation of RawMutex and RawRwlock using UnsafeCell --- Cargo.toml | 1 + core/Cargo.toml | 2 +- src/lib.rs | 19 +- src/wasm/mod.rs | 2 + src/wasm/raw_mutex.rs | 128 ++++++++++++++ src/wasm/raw_rwlock.rs | 393 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 541 insertions(+), 4 deletions(-) create mode 100644 src/wasm/mod.rs create mode 100644 src/wasm/raw_mutex.rs create mode 100644 src/wasm/raw_rwlock.rs diff --git a/Cargo.toml b/Cargo.toml index 1e9230b0..faa13e89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ edition = "2018" [dependencies] parking_lot_core = { path = "core", version = "0.6.2" } lock_api = { path = "lock_api", version = "0.3.1" } +cfg-if = "0.1.10" [dev-dependencies] rand = "0.7" diff --git a/core/Cargo.toml b/core/Cargo.toml index ed938175..1e47d515 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,7 +10,7 @@ categories = ["concurrency"] edition = "2018" [dependencies] -cfg-if = "0.1.5" +cfg-if = "0.1.10" smallvec = "0.6" petgraph = { version = "0.4.5", optional = true } thread-id = { version = "3.2.0", optional = true } diff --git a/src/lib.rs b/src/lib.rs index db0d6b3e..eb0ddd5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,12 +13,25 @@ #![warn(rust_2018_idioms)] #![cfg_attr(feature = "nightly", feature(asm))] +cfg_if::cfg_if! { + if #[cfg(target_arch = "wasm32")] { + mod wasm; + mod raw_mutex { + pub use crate::wasm::raw_mutex::*; + } + mod raw_rwlock { + pub use crate::wasm::raw_rwlock::*; + } + } else { + mod raw_mutex; + mod raw_rwlock; + mod elision; + } +} + mod condvar; -mod elision; mod mutex; mod once; -mod raw_mutex; -mod raw_rwlock; mod remutex; mod rwlock; mod util; diff --git a/src/wasm/mod.rs b/src/wasm/mod.rs new file mode 100644 index 00000000..0ced034f --- /dev/null +++ b/src/wasm/mod.rs @@ -0,0 +1,2 @@ +pub mod raw_mutex; +pub mod raw_rwlock; diff --git a/src/wasm/raw_mutex.rs b/src/wasm/raw_mutex.rs new file mode 100644 index 00000000..0553452d --- /dev/null +++ b/src/wasm/raw_mutex.rs @@ -0,0 +1,128 @@ +// Eventually, when access to WASM i32_atomic_wait is stable, this should look more like +// https://github.com/rust-lang/rust/blob/f51752774bbbe48d2aabe53c86e9e91ed3a73a5d/src/libstd/sys/wasm/mutex_atomics.rs#L81-L160 +// +// For now, we essentially do what +// https://github.com/rust-lang/rust/blob/253fc0ed742c235fa34c5d78814fa7b8a5e5e055/src/libstd/sys/wasm/mutex.rs does. + +use std::cell::UnsafeCell; + +const LOCKED_BIT: u8 = 1; +const PARKED_BIT: u8 = 2; + +// UnparkToken used to indicate that that the target thread should attempt to +// lock the mutex again as soon as it is unparked. +pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0); + +// UnparkToken used to indicate that the mutex is being handed off to the target +// thread directly without unlocking it. +pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1); + +/// Raw mutex type backed by the parking lot. +pub struct RawMutex { + state: UnsafeCell, +} + +unsafe impl Send for RawMutex {} +unsafe impl Sync for RawMutex {} // no threads on wasm + +use crate::deadlock; +use core::time::Duration; +use lock_api::{GuardSend, RawMutex as RawMutexTrait, RawMutexFair, RawMutexTimed}; +use parking_lot_core::{self, UnparkToken}; +use std::time::Instant; + +unsafe impl RawMutexTrait for RawMutex { + const INIT: RawMutex = RawMutex { + state: UnsafeCell::new(0u8), + }; + + type GuardMarker = GuardSend; + + #[inline] + fn lock(&self) { + unsafe { + let state = self.state.get(); + assert!( + (*state & LOCKED_BIT) == 0, + "cannot recursively acquire Mutex" + ); + *state = *state | LOCKED_BIT; + deadlock::acquire_resource(self as *const _ as usize); + }; + } + + #[inline] + fn try_lock(&self) -> bool { + let state = self.state.get(); + unsafe { + if *state & LOCKED_BIT > 0 { + false + } else { + *state |= *state; + deadlock::acquire_resource(self as *const _ as usize); + true + } + } + } + + #[inline] + fn unlock(&self) { + unsafe { + deadlock::release_resource(self as *const _ as usize); + let state = self.state.get(); + *state &= !LOCKED_BIT; + }; + } +} + +unsafe impl RawMutexTimed for RawMutex { + type Duration = Duration; + type Instant = Instant; + + #[inline] + fn try_lock_until(&self, _timeout: Instant) -> bool { + self.try_lock() + } + + #[inline] + fn try_lock_for(&self, _timeout: Duration) -> bool { + self.try_lock() + } +} + +unsafe impl RawMutexFair for RawMutex { + #[inline] + fn unlock_fair(&self) { + self.unlock() + } + + #[inline] + fn bump(&self) {} +} + +impl RawMutex { + // Used by Condvar when requeuing threads to us, must be called while + // holding the queue lock. + // false if unlocked + #[inline] + pub(crate) fn mark_parked_if_locked(&self) -> bool { + unsafe { + let state = self.state.get(); + if *state & LOCKED_BIT > 0 { + false + } else { + *state &= PARKED_BIT; + true + } + } + } + + // Used by Condvar when requeuing threads to us, must be called while + // holding the queue lock. + #[inline] + pub(crate) fn mark_parked(&self) { + unsafe { + *self.state.get() &= !PARKED_BIT; + } + } +} diff --git a/src/wasm/raw_rwlock.rs b/src/wasm/raw_rwlock.rs new file mode 100644 index 00000000..13d32e76 --- /dev/null +++ b/src/wasm/raw_rwlock.rs @@ -0,0 +1,393 @@ +// Copyright 2019 Cormac Relf +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use core::cell::UnsafeCell; +use core::time::Duration; +use lock_api::{ + GuardSend, RawRwLock as RawRwLockTrait, RawRwLockDowngrade, RawRwLockFair, RawRwLockRecursive, + RawRwLockRecursiveTimed, RawRwLockTimed, RawRwLockUpgrade, RawRwLockUpgradeDowngrade, + RawRwLockUpgradeFair, RawRwLockUpgradeTimed, +}; +use parking_lot_core::deadlock; +use std::time::Instant; + +// This reader-writer lock implementation is based on Boost's upgrade_mutex: +// https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 +// +// This implementation uses 2 wait queues, one at key [addr] and one at key +// [addr + 1]. The primary queue is used for all new waiting threads, and the +// secondary queue is used by the thread which has acquired WRITER_BIT but is +// waiting for the remaining readers to exit the lock. +// +// This implementation is fair between readers and writers since it uses the +// order in which threads first started queuing to alternate between read phases +// and write phases. In particular is it not vulnerable to write starvation +// since readers will block if there is a pending writer. + +// There is at least one thread in the main queue. +const PARKED_BIT: usize = 0b0001; +// There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. +const WRITER_PARKED_BIT: usize = 0b0010; +// A reader is holding an upgradable lock. The reader count must be non-zero and +// WRITER_BIT must not be set. +const UPGRADABLE_BIT: usize = 0b0100; +// If the reader count is zero: a writer is currently holding an exclusive lock. +// Otherwise: a writer is waiting for the remaining readers to exit the lock. +const WRITER_BIT: usize = 0b1000; +// Mask of bits used to count readers. +const READERS_MASK: usize = !0b1111; +// Base unit for counting readers. +const ONE_READER: usize = 0b10000; + +// Token indicating what type of lock a queued thread is trying to acquire +// const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); +// const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); +// const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); + +/// Raw reader-writer lock type backed by the parking lot. +pub struct RawRwLock { + state: UnsafeCell, +} + +unsafe impl Send for RawRwLock {} +unsafe impl Sync for RawRwLock {} // no threads on wasm + +unsafe impl RawRwLockTrait for RawRwLock { + const INIT: RawRwLock = RawRwLock { + state: UnsafeCell::new(0), + }; + + type GuardMarker = GuardSend; + + #[inline] + fn lock_exclusive(&self) { + let res = self.try_lock_exclusive(); + if !res { + panic!("could not acquire exclusive lock"); + } + } + + #[inline] + fn try_lock_exclusive(&self) -> bool { + unsafe { + let state = self.state.get(); + if *state == 0 { + *state = WRITER_BIT; + self.deadlock_acquire(); + true + } else { + false + } + } + } + + #[inline] + fn unlock_exclusive(&self) { + self.deadlock_release(); + unsafe { + let state = self.state.get(); + if *state == WRITER_BIT { + *state = 0; + } else { + panic!("did not have exclusive lock to unlock") + } + } + } + + #[inline] + fn lock_shared(&self) { + self.try_lock_shared_fast(false); + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_shared(&self) -> bool { + self.lock_shared(); + true + } + + #[inline] + fn unlock_shared(&self) { + self.deadlock_release(); + let state = self.state.get(); + unsafe { + *state = (*state) + .checked_sub(ONE_READER) + .expect("RwLock reader count underflow"); + if *state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { + panic!("impossible?") + } + } + } +} + +unsafe impl RawRwLockFair for RawRwLock { + #[inline] + fn unlock_shared_fair(&self) { + // Shared unlocking is always fair in this implementation. + self.unlock_shared(); + } + + #[inline] + fn unlock_exclusive_fair(&self) { + self.unlock_exclusive(); + } + + #[inline] + fn bump_shared(&self) {} + + #[inline] + fn bump_exclusive(&self) {} +} + +unsafe impl RawRwLockDowngrade for RawRwLock { + #[inline] + fn downgrade(&self) { + let state = self.state.get(); + unsafe { + *state = *state + (ONE_READER - WRITER_BIT); + // Wake up parked shared and upgradable threads if there are any + if *state & PARKED_BIT != 0 { + // self.downgrade_slow(); + } + } + } +} + +unsafe impl RawRwLockTimed for RawRwLock { + type Duration = Duration; + type Instant = Instant; + + #[inline] + fn try_lock_shared_for(&self, _timeout: Self::Duration) -> bool { + self.try_lock_shared() + } + + #[inline] + fn try_lock_shared_until(&self, _timeout: Self::Instant) -> bool { + self.try_lock_shared() + } + + #[inline] + fn try_lock_exclusive_for(&self, _timeout: Duration) -> bool { + self.try_lock_exclusive() + } + + #[inline] + fn try_lock_exclusive_until(&self, _timeout: Instant) -> bool { + self.try_lock_exclusive() + } +} + +unsafe impl RawRwLockRecursive for RawRwLock { + #[inline] + fn lock_shared_recursive(&self) { + let result = self.try_lock_shared_fast(true); + if !result { + panic!("could not acquire shared recursive lock"); + } + self.deadlock_acquire(); + } + + #[inline] + fn try_lock_shared_recursive(&self) -> bool { + let result = self.try_lock_shared_fast(true); + if result { + self.deadlock_acquire(); + } + result + } +} + +unsafe impl RawRwLockRecursiveTimed for RawRwLock { + #[inline] + fn try_lock_shared_recursive_for(&self, _timeout: Self::Duration) -> bool { + self.try_lock_shared_recursive() + } + + #[inline] + fn try_lock_shared_recursive_until(&self, _timeout: Self::Instant) -> bool { + self.try_lock_shared_recursive() + } +} + +unsafe impl RawRwLockUpgrade for RawRwLock { + #[inline] + fn lock_upgradable(&self) { + if !self.try_lock_upgradable_fast() { + panic!("could not lock upgradble"); + } + } + + #[inline] + fn try_lock_upgradable(&self) -> bool { + let res = self.try_lock_upgradable_fast(); + if res { + self.deadlock_acquire(); + } + res + } + + #[inline] + fn unlock_upgradable(&self) { + self.deadlock_release(); + let state = self.state.get(); + unsafe { + if *state & PARKED_BIT == 0 { + *state -= ONE_READER | UPGRADABLE_BIT; + } + } + } + + #[inline] + fn upgrade(&self) { + let state = self.state.get(); + unsafe { + *state -= (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT; + if *state & READERS_MASK != ONE_READER { + panic!("cannot upgrade") + } + } + } + + #[inline] + fn try_upgrade(&self) -> bool { + let state = self.state.get(); + unsafe { + if *state == ONE_READER | UPGRADABLE_BIT { + *state = WRITER_BIT; + true + } else { + false + } + } + } +} + +unsafe impl RawRwLockUpgradeFair for RawRwLock { + #[inline] + fn unlock_upgradable_fair(&self) { + self.unlock_upgradable(); + } + + #[inline] + fn bump_upgradable(&self) {} +} + +unsafe impl RawRwLockUpgradeDowngrade for RawRwLock { + #[inline] + fn downgrade_upgradable(&self) { + let state = self.state.get(); + unsafe { + *state -= UPGRADABLE_BIT; + // Wake up parked upgradable threads if there are any + if *state & PARKED_BIT != 0 { + panic!("cannot downgrade_slow on wasm, as no parking"); + // self.downgrade_slow(); + } + } + } + + #[inline] + fn downgrade_to_upgradable(&self) { + let state = self.state.get(); + unsafe { + *state += (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT; + + // Wake up parked shared threads if there are any + if *state & PARKED_BIT != 0 { + panic!("cannot downgrade_to_upgradable_slow on wasm, as no parking"); + } + } + } +} + +unsafe impl RawRwLockUpgradeTimed for RawRwLock { + #[inline] + fn try_lock_upgradable_until(&self, _timeout: Instant) -> bool { + self.try_lock_upgradable() + } + + #[inline] + fn try_lock_upgradable_for(&self, _timeout: Duration) -> bool { + self.try_lock_upgradable() + } + + #[inline] + fn try_upgrade_until(&self, _timeout: Self::Instant) -> bool { + let state = self.state.get(); + unsafe { + *state = (*state) - (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT; + *state & READERS_MASK == ONE_READER + } + } + + #[inline] + fn try_upgrade_for(&self, _timeout: Duration) -> bool { + let state = self.state.get(); + unsafe { + *state = (*state) - (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT; + *state & READERS_MASK == ONE_READER + } + } +} + +impl RawRwLock { + #[inline(always)] + fn try_lock_shared_fast(&self, recursive: bool) -> bool { + let state = self.state.get(); + + unsafe { + // We can't allow grabbing a shared lock if there is a writer, even if + // the writer is still waiting for the remaining readers to exit. + if *state & WRITER_BIT != 0 { + // To allow recursive locks, we make an exception and allow readers + // to skip ahead of a pending writer to avoid deadlocking, at the + // cost of breaking the fairness guarantees. + if !recursive || *state & READERS_MASK == 0 { + return false; + } + } + *state = (*state) + .checked_add(ONE_READER) + .expect("RwLock reader count overflow"); + } + true + } + + #[inline(always)] + fn try_lock_upgradable_fast(&self) -> bool { + let state = self.state.get(); + + unsafe { + // We can't grab an upgradable lock if there is already a writer or + // upgradable reader. + if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { + return false; + } + + if let Some(new_state) = (*state).checked_add(ONE_READER | UPGRADABLE_BIT) { + *state = new_state; + true + } else { + false + } + } + } + + #[inline] + fn deadlock_acquire(&self) { + unsafe { deadlock::acquire_resource(self as *const _ as usize) }; + unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; + } + + #[inline] + fn deadlock_release(&self) { + unsafe { deadlock::release_resource(self as *const _ as usize) }; + unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; + } +}