diff --git a/src/rw_lock.rs b/src/rw_lock.rs index 89022be..181b26c 100644 --- a/src/rw_lock.rs +++ b/src/rw_lock.rs @@ -1,8 +1,11 @@ -use core::sync::atomic::{AtomicUsize, Ordering, spin_loop_hint as cpu_relax}; use core::cell::UnsafeCell; -use core::ops::{Deref, DerefMut}; -use core::fmt; use core::default::Default; +use core::fmt; +use core::marker::PhantomData; +use core::mem; +use core::ops::{Deref, DerefMut}; +use core::ptr::NonNull; +use core::sync::atomic::{spin_loop_hint as cpu_relax, AtomicUsize, Ordering}; /// A reader-writer lock /// @@ -17,8 +20,19 @@ use core::default::Default; /// locking methods implement `Deref` (and `DerefMut` for the `write` methods) /// to allow access to the contained of the lock. /// -/// Based on -/// +/// An [`RwLockUpgradeableGuard`](RwLockUpgradeableGuard) can be upgraded to a +/// writable guard through the [`RwLockUpgradeableGuard::upgrade`](RwLockUpgradeableGuard::upgrade) +/// [`RwLockUpgradeableGuard::try_upgrade`](RwLockUpgradeableGuard::try_upgrade) functions. +/// Writable or upgradeable guards can be downgraded through their respective `downgrade` +/// functions. +/// +/// Based on Facebook's +/// [`folly/RWSpinLock.h`](https://github.com/facebook/folly/blob/a0394d84f2d5c3e50ebfd0566f9d3acb52cfab5a/folly/synchronization/RWSpinLock.h). +/// This implementation is unfair to writers - if the lock always has readers, then no writers will +/// ever get a chance. Using an upgradeable lock guard can *somewhat* alleviate this issue as no +/// new readers are allowed when an upgradeable guard is held, but upgradeable guards can be taken +/// when there are existing readers. However if the lock is that highly contended and writes are +/// crucial then this implementation may be a poor choice. /// /// # Examples /// @@ -42,41 +56,57 @@ use core::default::Default; /// assert_eq!(*w, 6); /// } // write lock is dropped here /// ``` -pub struct RwLock -{ +pub struct RwLock { lock: AtomicUsize, data: UnsafeCell, } +const READER: usize = 1 << 2; +const UPGRADED: usize = 1 << 1; +const WRITER: usize = 1; + /// A guard from which the protected data can be read /// /// When the guard falls out of scope it will decrement the read count, /// potentially releasing the lock. #[derive(Debug)] -pub struct RwLockReadGuard<'a, T: 'a + ?Sized> -{ +pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { lock: &'a AtomicUsize, - data: &'a T, + data: NonNull, } /// A guard to which the protected data can be written /// /// When the guard falls out of scope it will release the lock. #[derive(Debug)] -pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> -{ +pub struct RwLockWriteGuard<'a, T: 'a + ?Sized> { lock: &'a AtomicUsize, - data: &'a mut T, + data: NonNull, + #[doc(hidden)] + _invariant: PhantomData<&'a mut T>, +} + +/// A guard from which the protected data can be read, and can be upgraded +/// to a writable guard if needed +/// +/// No writers or other upgradeable guards can exist while this is in scope. New reader +/// creation is prevented (to alleviate writer starvation) but there may be existing readers +/// when the lock is acquired. +/// +/// When the guard falls out of scope it will release the lock. +#[derive(Debug)] +pub struct RwLockUpgradeableGuard<'a, T: 'a + ?Sized> { + lock: &'a AtomicUsize, + data: NonNull, + #[doc(hidden)] + _invariant: PhantomData<&'a mut T>, } // Same unsafe impls as `std::sync::RwLock` unsafe impl Send for RwLock {} unsafe impl Sync for RwLock {} -const USIZE_MSB: usize = ::core::isize::MIN as usize; - -impl RwLock -{ +impl RwLock { /// Creates a new spinlock wrapping the supplied data. /// /// May be used statically: @@ -93,18 +123,16 @@ impl RwLock /// } /// ``` #[inline] - pub const fn new(user_data: T) -> RwLock - { - RwLock - { + pub const fn new(user_data: T) -> RwLock { + RwLock { lock: AtomicUsize::new(0), data: UnsafeCell::new(user_data), } } /// Consumes this `RwLock`, returning the underlying data. - pub fn into_inner(self) -> T - { + #[inline] + pub fn into_inner(self) -> T { // We know statically that there are no outstanding references to // `self` so there's no need to lock. let RwLock { data, .. } = self; @@ -112,8 +140,7 @@ impl RwLock } } -impl RwLock -{ +impl RwLock { /// Locks this rwlock with shared read access, blocking the current thread /// until it can be acquired. /// @@ -136,35 +163,12 @@ impl RwLock /// } /// ``` #[inline] - pub fn read<'a>(&'a self) -> RwLockReadGuard<'a, T> - { - // (funny do-while loop) - while { - // Old value, with write bit unset - let mut old; - - // Wait for for writer to go away before doing expensive atomic ops - // (funny do-while loop) - while { - old = self.lock.load(Ordering::Relaxed); - old & USIZE_MSB != 0 - } { - cpu_relax(); + pub fn read(&self) -> RwLockReadGuard { + loop { + match self.try_read() { + Some(guard) => return guard, + None => cpu_relax(), } - - // unset write bit - old &= !USIZE_MSB; - - let new = old + 1; - debug_assert!(new != (!USIZE_MSB) & (!0)); - - self.lock.compare_and_swap(old, new, Ordering::SeqCst) != old - } { - cpu_relax(); - } - RwLockReadGuard { - lock: &self.lock, - data: unsafe { & *self.data.get() }, } } @@ -191,23 +195,20 @@ impl RwLock /// } /// ``` #[inline] - pub fn try_read(&self) -> Option> - { - // Old value, with write bit unset - let old = (!USIZE_MSB) & self.lock.load(Ordering::Relaxed); - - let new = old + 1; - debug_assert!(new != (!USIZE_MSB) & (!0)); - if self.lock.compare_and_swap(old, - new, - Ordering::SeqCst) == old - { + pub fn try_read(&self) -> Option> { + let value = self.lock.fetch_add(READER, Ordering::Acquire); + + // We check the UPGRADED bit here so that new readers are prevented when an UPGRADED lock is held. + // This helps reduce writer starvation. + if value & (WRITER | UPGRADED) != 0 { + // Lock is taken, undo. + self.lock.fetch_sub(READER, Ordering::Release); + None + } else { Some(RwLockReadGuard { lock: &self.lock, - data: unsafe { & *self.data.get() }, + data: unsafe { NonNull::new_unchecked(self.data.get()) }, }) - } else { - None } } @@ -216,20 +217,45 @@ impl RwLock /// This is *extremely* unsafe if there are outstanding `RwLockReadGuard`s /// live, or if called more times than `read` has been called, but can be /// useful in FFI contexts where the caller doesn't know how to deal with - /// RAII. + /// RAII. The underlying atomic operation uses `Ordering::Release`. + #[inline] pub unsafe fn force_read_decrement(&self) { - debug_assert!(self.lock.load(Ordering::Relaxed) & (!USIZE_MSB) > 0); - self.lock.fetch_sub(1, Ordering::SeqCst); + debug_assert!(self.lock.load(Ordering::Relaxed) & !WRITER > 0); + self.lock.fetch_sub(READER, Ordering::Release); } /// Force unlock exclusive write access. /// /// This is *extremely* unsafe if there are outstanding `RwLockWriteGuard`s /// live, or if called when there are current readers, but can be useful in - /// FFI contexts where the caller doesn't know how to deal with RAII. + /// FFI contexts where the caller doesn't know how to deal with RAII. The + /// underlying atomic operation uses `Ordering::Release`. + #[inline] pub unsafe fn force_write_unlock(&self) { - debug_assert_eq!(self.lock.load(Ordering::Relaxed), USIZE_MSB); - self.lock.store(0, Ordering::Relaxed); + debug_assert_eq!(self.lock.load(Ordering::Relaxed) & !(WRITER | UPGRADED), 0); + self.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release); + } + + #[inline(always)] + fn try_write_internal(&self, strong: bool) -> Option> { + if compare_exchange( + &self.lock, + 0, + WRITER, + Ordering::Acquire, + Ordering::Relaxed, + strong, + ) + .is_ok() + { + Some(RwLockWriteGuard { + lock: &self.lock, + data: unsafe { NonNull::new_unchecked(self.data.get()) }, + _invariant: PhantomData, + }) + } else { + None + } } /// Lock this rwlock with exclusive write access, blocking the current @@ -251,29 +277,13 @@ impl RwLock /// } /// ``` #[inline] - pub fn write<'a>(&'a self) -> RwLockWriteGuard<'a, T> - { - loop - { - // Old value, with write bit unset. - let old = (!USIZE_MSB) & self.lock.load(Ordering::Relaxed); - // Old value, with write bit set. - let new = USIZE_MSB | old; - if self.lock.compare_and_swap(old, - new, - Ordering::SeqCst) == old - { - // Wait for readers to go away, then lock is ours. - while self.lock.load(Ordering::Relaxed) != USIZE_MSB { - cpu_relax(); - } - break + pub fn write(&self) -> RwLockWriteGuard { + loop { + match self.try_write_internal(false) { + Some(guard) => return guard, + None => cpu_relax(), } } - RwLockWriteGuard { - lock: &self.lock, - data: unsafe { &mut *self.data.get() }, - } } /// Attempt to lock this rwlock with exclusive write access. @@ -296,28 +306,42 @@ impl RwLock /// } /// ``` #[inline] - pub fn try_write(&self) -> Option> - { - if self.lock.compare_and_swap(0, - USIZE_MSB, - Ordering::SeqCst) == 0 - { - Some(RwLockWriteGuard { + pub fn try_write(&self) -> Option> { + self.try_write_internal(true) + } + + /// Obtain a readable lock guard that can later be upgraded to a writable lock guard. + /// Upgrades can be done through the [`RwLockUpgradeableGuard::upgrade`](RwLockUpgradeableGuard::upgrade) method. + #[inline] + pub fn upgradeable_read(&self) -> RwLockUpgradeableGuard { + loop { + match self.try_upgradeable_read() { + Some(guard) => return guard, + None => cpu_relax(), + } + } + } + + /// Tries to obtain an upgradeable lock guard. + #[inline] + pub fn try_upgradeable_read(&self) -> Option> { + if self.lock.fetch_or(UPGRADED, Ordering::Acquire) & (WRITER | UPGRADED) == 0 { + Some(RwLockUpgradeableGuard { lock: &self.lock, - data: unsafe { &mut *self.data.get() }, + data: unsafe { NonNull::new_unchecked(self.data.get()) }, + _invariant: PhantomData, }) } else { + // We can't unflip the UPGRADED bit back just yet as there is another upgradeable or write lock. + // When they unlock, they will clear the bit. None } } } -impl fmt::Debug for RwLock -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result - { - match self.try_read() - { +impl fmt::Debug for RwLock { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_read() { Some(guard) => write!(f, "RwLock {{ data: ") .and_then(|()| (&*guard).fmt(f)) .and_then(|()| write!(f, "}}")), @@ -332,33 +356,195 @@ impl Default for RwLock { } } +impl<'rwlock, T: ?Sized> RwLockUpgradeableGuard<'rwlock, T> { + #[inline(always)] + fn try_upgrade_internal(self, strong: bool) -> Result, Self> { + if compare_exchange( + &self.lock, + UPGRADED, + WRITER, + Ordering::Acquire, + Ordering::Relaxed, + strong, + ) + .is_ok() + { + // Upgrade successful + let out = Ok(RwLockWriteGuard { + lock: &self.lock, + data: self.data, + _invariant: PhantomData, + }); + + // Forget the old guard so its destructor doesn't run + mem::forget(self); + + out + } else { + Err(self) + } + } + + /// Upgrades an upgradeable lock guard to a writable lock guard. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let upgradeable = mylock.upgradeable_read(); // Readable, but not yet writable + /// let writable = upgradeable.upgrade(); + /// ``` + #[inline] + pub fn upgrade(mut self) -> RwLockWriteGuard<'rwlock, T> { + loop { + self = match self.try_upgrade_internal(false) { + Ok(guard) => return guard, + Err(e) => e, + }; + + cpu_relax(); + } + } + + /// Tries to upgrade an upgradeable lock guard to a writable lock guard. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// let upgradeable = mylock.upgradeable_read(); // Readable, but not yet writable + /// + /// match upgradeable.try_upgrade() { + /// Ok(writable) => /* upgrade successful - use writable lock guard */ (), + /// Err(upgradeable) => /* upgrade unsuccessful */ (), + /// }; + /// ``` + #[inline] + pub fn try_upgrade(self) -> Result, Self> { + self.try_upgrade_internal(true) + } + + #[inline] + /// Downgrades the upgradeable lock guard to a readable, shared lock guard. Cannot fail and is guaranteed not to spin. + /// + /// ``` + /// let mylock = spin::RwLock::new(1); + /// + /// let upgradeable = mylock.upgradeable_read(); + /// assert!(mylock.try_read().is_none()); + /// assert_eq!(*upgradeable, 1); + /// + /// let readable = upgradeable.downgrade(); // This is guaranteed not to spin + /// assert!(mylock.try_read().is_some()); + /// assert_eq!(*readable, 1); + /// ``` + pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { + // Reserve the read guard for ourselves + self.lock.fetch_add(READER, Ordering::Acquire); + + RwLockReadGuard { + lock: &self.lock, + data: self.data, + } + + // Dropping self removes the UPGRADED bit + } +} + +impl<'rwlock, T: ?Sized> RwLockWriteGuard<'rwlock, T> { + /// Downgrades the writable lock guard to a readable, shared lock guard. Cannot fail and is guaranteed not to spin. + /// + /// ``` + /// let mylock = spin::RwLock::new(0); + /// + /// let mut writable = mylock.write(); + /// *writable = 1; + /// + /// let readable = writable.downgrade(); // This is guaranteed not to spin + /// # let readable_2 = mylock.try_read().unwrap(); + /// assert_eq!(*readable, 1); + /// ``` + #[inline] + pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> { + // Reserve the read guard for ourselves + self.lock.fetch_add(READER, Ordering::Acquire); + + RwLockReadGuard { + lock: &self.lock, + data: self.data, + } + + // Dropping self removes the WRITER bit + } +} + impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> { type Target = T; - fn deref(&self) -> &T { self.data } + fn deref(&self) -> &T { + unsafe { self.data.as_ref() } + } +} + +impl<'rwlock, T: ?Sized> Deref for RwLockUpgradeableGuard<'rwlock, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { self.data.as_ref() } + } } impl<'rwlock, T: ?Sized> Deref for RwLockWriteGuard<'rwlock, T> { type Target = T; - fn deref(&self) -> &T { self.data } + fn deref(&self) -> &T { + unsafe { self.data.as_ref() } + } } impl<'rwlock, T: ?Sized> DerefMut for RwLockWriteGuard<'rwlock, T> { - fn deref_mut(&mut self) -> &mut T { self.data } + fn deref_mut(&mut self) -> &mut T { + unsafe { self.data.as_mut() } + } } impl<'rwlock, T: ?Sized> Drop for RwLockReadGuard<'rwlock, T> { fn drop(&mut self) { - debug_assert!(self.lock.load(Ordering::Relaxed) & (!USIZE_MSB) > 0); - self.lock.fetch_sub(1, Ordering::SeqCst); + debug_assert!(self.lock.load(Ordering::Relaxed) & !(WRITER | UPGRADED) > 0); + self.lock.fetch_sub(READER, Ordering::Release); + } +} + +impl<'rwlock, T: ?Sized> Drop for RwLockUpgradeableGuard<'rwlock, T> { + fn drop(&mut self) { + debug_assert_eq!( + self.lock.load(Ordering::Relaxed) & (WRITER | UPGRADED), + UPGRADED + ); + self.lock.fetch_sub(UPGRADED, Ordering::AcqRel); } } impl<'rwlock, T: ?Sized> Drop for RwLockWriteGuard<'rwlock, T> { fn drop(&mut self) { - debug_assert_eq!(self.lock.load(Ordering::Relaxed), USIZE_MSB); - self.lock.store(0, Ordering::Relaxed); + debug_assert_eq!(self.lock.load(Ordering::Relaxed) & WRITER, WRITER); + + // Writer is responsible for clearing both WRITER and UPGRADED bits. + // The UPGRADED bit may be set if an upgradeable lock attempts an upgrade while this lock is held. + self.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release); + } +} + +#[inline(always)] +fn compare_exchange( + atomic: &AtomicUsize, + current: usize, + new: usize, + success: Ordering, + failure: Ordering, + strong: bool, +) -> Result { + if strong { + atomic.compare_exchange(current, new, success, failure) + } else { + atomic.compare_exchange_weak(current, new, success, failure) } } @@ -366,9 +552,9 @@ impl<'rwlock, T: ?Sized> Drop for RwLockWriteGuard<'rwlock, T> { mod tests { use std::prelude::v1::*; - use std::sync::Arc; - use std::sync::mpsc::channel; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; use std::thread; use super::*; @@ -418,7 +604,7 @@ mod tests { let arc2 = arc.clone(); let (tx, rx) = channel(); - thread::spawn(move|| { + thread::spawn(move || { let mut lock = arc2.write(); for _ in 0..10 { let tmp = *lock; @@ -433,7 +619,7 @@ mod tests { let mut children = Vec::new(); for _ in 0..5 { let arc3 = arc.clone(); - children.push(thread::spawn(move|| { + children.push(thread::spawn(move || { let lock = arc3.read(); assert!(*lock >= 0); })); @@ -451,10 +637,10 @@ mod tests { } #[test] - fn test_rw_arc_access_in_unwind() { + fn test_rw_access_in_unwind() { let arc = Arc::new(RwLock::new(1)); let arc2 = arc.clone(); - let _ = thread::spawn(move|| -> () { + let _ = thread::spawn(move || -> () { struct Unwinder { i: Arc>, } @@ -466,7 +652,8 @@ mod tests { } let _u = Unwinder { i: arc2 }; panic!(); - }).join(); + }) + .join(); let lock = arc.read(); assert_eq!(*lock, 2); } @@ -493,12 +680,22 @@ mod tests { let write_result = lock.try_write(); match write_result { None => (), - Some(_) => assert!(false, "try_write should not succeed while read_guard is in scope"), + Some(_) => assert!( + false, + "try_write should not succeed while read_guard is in scope" + ), } drop(read_guard); } + #[test] + fn test_rw_try_read() { + let m = RwLock::new(0); + mem::forget(m.write()); + assert!(m.try_read().is_none()); + } + #[test] fn test_into_inner() { let m = RwLock::new(NonCopy(10)); @@ -551,4 +748,30 @@ mod tests { } assert!(m.try_read().is_some()); } + + #[test] + fn test_upgrade_downgrade() { + let m = RwLock::new(()); + { + let _r = m.read(); + let upg = m.try_upgradeable_read().unwrap(); + assert!(m.try_read().is_none()); + assert!(m.try_write().is_none()); + assert!(upg.try_upgrade().is_err()); + } + { + let w = m.write(); + assert!(m.try_upgradeable_read().is_none()); + let _r = w.downgrade(); + assert!(m.try_upgradeable_read().is_some()); + assert!(m.try_read().is_some()); + assert!(m.try_write().is_none()); + } + { + let _u = m.upgradeable_read(); + assert!(m.try_upgradeable_read().is_none()); + } + + assert!(m.try_upgradeable_read().unwrap().try_upgrade().is_ok()); + } }