Skip to content

Commit

Permalink
sync: Add RwLock::{try_read, try_write}
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro Lysai committed May 20, 2020
1 parent f480659 commit a62f142
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 1 deletion.
119 changes: 118 additions & 1 deletion tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::sync::batch_semaphore::{AcquireError, Semaphore};
use crate::sync::batch_semaphore::{AcquireError, Semaphore, TryAcquireError};
use std::cell::UnsafeCell;
use std::ops;
use std::{error, fmt};

#[cfg(not(loom))]
const MAX_READS: usize = 32;
Expand Down Expand Up @@ -118,6 +119,14 @@ impl<'a, T> ReleasingPermit<'a, T> {
lock.s.acquire(num_permits).await?;
Ok(Self { num_permits, lock })
}

fn try_acquire(
lock: &'a RwLock<T>,
num_permits: u16,
) -> Result<ReleasingPermit<'a, T>, TryAcquireError> {
lock.s.try_acquire(num_permits)?;
Ok(Self { num_permits, lock })
}
}

impl<'a, T> Drop for ReleasingPermit<'a, T> {
Expand Down Expand Up @@ -215,6 +224,42 @@ impl<T> RwLock<T> {
RwLockReadGuard { lock: self, permit }
}

/// Attempts to lock this rwlock with shared read access, returning [`TryReadError`]
/// if the lock couldn't be acquired immediately without yielding the current task.
///
/// This function will return an error immediately if other writers currently have access to the
/// lock.
///
/// Returns an RAII guard which will drop the read access of this rwlock when dropped.
///
/// [`TryReadError`]: TryReadError
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// let lock = RwLock::new(1);
///
/// if let Ok(n) = lock.try_read() {
/// assert_eq!(*n, 1);
/// } else {
/// unreachable!()
/// };
/// ```
pub fn try_read(&self) -> Result<RwLockReadGuard<'_, T>, TryReadError> {
let permit = match ReleasingPermit::try_acquire(self, 1) {
Ok(v) => v,
Err(TryAcquireError::NoPermits) => return Err(TryReadError(())),
Err(TryAcquireError::Closed) => {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}
};
Ok(RwLockReadGuard { lock: self, permit })
}

/// Locks this rwlock with exclusive write access, causing the current task
/// to yield until the lock has been acquired.
///
Expand Down Expand Up @@ -249,6 +294,44 @@ impl<T> RwLock<T> {
RwLockWriteGuard { lock: self, permit }
}

/// Attempts to lock this rwlock with exclusive write access, returning [`TryWriteError`]
/// if the lock couldn't be acquired immediately without yielding the current task.
///
/// This function will return an error immediately if other writers or other readers
/// currently have access to the lock.
///
/// Returns an RAII guard which will drop the write access of this rwlock when dropped.
///
/// [`TryWriteError`]: TryWriteError
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// let lock = RwLock::new(1);
///
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// if let Ok(mut n) = lock.try_write() {
/// *n = 2;
/// }
///
/// assert_eq!(*lock.try_read().unwrap(), 2);
/// ```
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryWriteError> {
let permit = match ReleasingPermit::try_acquire(self, MAX_READS as u16) {
Ok(v) => v,
Err(TryAcquireError::NoPermits) => return Err(TryWriteError(())),
Err(TryAcquireError::Closed) => {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
}
};
Ok(RwLockWriteGuard { lock: self, permit })
}

/// Consumes the lock, returning the underlying data.
pub fn into_inner(self) -> T {
self.c.into_inner()
Expand Down Expand Up @@ -291,3 +374,37 @@ where
Self::new(T::default())
}
}

/// Error returned by `try_read` when a read lock couldn't be acquired.
pub struct TryReadError(());

impl fmt::Debug for TryReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryReadError").finish()
}
}

impl fmt::Display for TryReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("read lock couldn't be acquired")
}
}

impl error::Error for TryReadError {}

/// Error returned by `try_write` when the write lock couldn't be acquired.
pub struct TryWriteError(());

impl fmt::Debug for TryWriteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryWriteError").finish()
}
}

impl fmt::Display for TryWriteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("write lock couldn't be acquired")
}
}

impl error::Error for TryWriteError {}
79 changes: 79 additions & 0 deletions tokio/src/sync/tests/loom_rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,49 @@ fn concurrent_write() {
});
}

fn spin_read<T>(lock: &RwLock<T>) -> RwLockReadGuard<T> {
loop {
if let Ok(r) = lock.try_read() {
break r;
}
}
}

fn spin_write<T>(lock: &RwLock<T>) -> RwLockWriteGuard<T> {
loop {
if let Ok(r) = lock.try_write() {
break r;
}
}
}

#[test]
fn concurrent_spin_write() {
let mut b = loom::model::Builder::new();

b.check(|| {
let rwlock = Arc::new(RwLock::<u32>::new(0));

let rwclone = rwlock.clone();
let t1 = thread::spawn(move || {
let mut guard = spin_write(&rwclone);
*guard += 5;
});

let rwclone = rwlock.clone();
let t2 = thread::spawn(move || {
let mut guard = spin_write(&rwclone);
*guard += 5;
});

t1.join().expect("thread 1 write should not panic");
t2.join().expect("thread 2 write should not panic");
//when all threads have finished the value on the lock should be 10
let guard = spin_read(&rwlock);
assert_eq!(10, *guard);
});
}

#[test]
fn concurrent_read_write() {
let mut b = loom::model::Builder::new();
Expand Down Expand Up @@ -76,3 +119,39 @@ fn concurrent_read_write() {
assert_eq!(10, *guard);
});
}

#[test]
fn concurrent_spin_read_write() {
let mut b = loom::model::Builder::new();

b.check(|| {
let rwlock = Arc::new(RwLock::<u32>::new(0));

let rwclone = rwlock.clone();
let t1 = thread::spawn(move || {
let mut guard = spin_write(&rwclone);
*guard += 5;
});

let rwclone = rwlock.clone();
let t2 = thread::spawn(move || {
let mut guard = spin_write(&rwclone);
*guard += 5;
});

let rwclone = rwlock.clone();
let t3 = thread::spawn(move || {
let guard = spin_read(&rwclone);
//at this state the value on the lock may either be 0, 5, or 10
assert!(*guard == 0 || *guard == 5 || *guard == 10);
});

t1.join().expect("thread 1 write should not panic");
t2.join().expect("thread 2 write should not panic");
t3.join().expect("thread 3 read should not panic");

let guard = spin_read(&rwlock);
//when all threads have finished the value on the lock should be 10
assert_eq!(10, *guard);
});
}
1 change: 1 addition & 0 deletions tokio/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cfg_not_loom! {
mod atomic_waker;
mod rwlock;
mod semaphore_ll;
mod semaphore_batch;
}
Expand Down
26 changes: 26 additions & 0 deletions tokio/src/sync/tests/rwlock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::sync::rwlock::*;

#[test]
fn serial_try_read_write() {
let l = RwLock::new(42);

{
let g = l.try_read().unwrap();
assert_eq!(*g, 42);

assert!(l.try_write().is_err());

let g2 = l.try_read().unwrap();
assert_eq!(*g2, 42);
}

{
let mut g = l.try_write().unwrap();
assert_eq!(*g, 42);
*g = 4242;

assert!(l.try_read().is_err());
}

assert_eq!(*l.try_read().unwrap(), 4242);
}

0 comments on commit a62f142

Please sign in to comment.