From efde6374070481b8c941cac2585203a40aabccbb Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Wed, 20 Dec 2023 17:41:45 +0100 Subject: [PATCH] [#51] Introduce WaitResult & WaitAction to replace bool return values --- .../src/condition_variable.rs | 12 +- iceoryx2-pal/concurrency-sync/src/lib.rs | 12 ++ iceoryx2-pal/concurrency-sync/src/mutex.rs | 36 +++-- iceoryx2-pal/concurrency-sync/src/rwlock.rs | 82 +++++++---- .../concurrency-sync/src/semaphore.rs | 16 +-- .../tests/condition_variable_tests.rs | 34 ++--- .../concurrency-sync/tests/mutex_tests.rs | 18 +-- .../concurrency-sync/tests/rwlock_tests.rs | 132 +++++++++--------- .../concurrency-sync/tests/semaphore_tests.rs | 20 +-- 9 files changed, 204 insertions(+), 158 deletions(-) diff --git a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs index 5317a6770..756150e8a 100644 --- a/iceoryx2-pal/concurrency-sync/src/condition_variable.rs +++ b/iceoryx2-pal/concurrency-sync/src/condition_variable.rs @@ -13,7 +13,7 @@ use core::sync::atomic::{AtomicU32, Ordering}; pub use crate::mutex::Mutex; -use crate::semaphore::Semaphore; +use crate::{semaphore::Semaphore, WaitAction, WaitResult}; pub struct ConditionVariable { number_of_waiters: AtomicU32, @@ -48,21 +48,21 @@ impl ConditionVariable { pub fn wait< WakeOne: Fn(&AtomicU32), - Wait: Fn(&AtomicU32, &u32) -> bool, - MtxWait: Fn(&AtomicU32, &u32) -> bool, + Wait: Fn(&AtomicU32, &u32) -> WaitAction, + MtxWait: Fn(&AtomicU32, &u32) -> WaitAction, >( &self, mtx: &Mutex, mtx_wake_one: WakeOne, wait: Wait, mtx_wait: MtxWait, - ) -> bool { + ) -> WaitResult { self.number_of_waiters.fetch_add(1, Ordering::Relaxed); mtx.unlock(mtx_wake_one); - if !self.semaphore.wait(wait) { + if self.semaphore.wait(wait) == WaitResult::Interrupted { self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); - return false; + return WaitResult::Interrupted; } self.number_of_waiters.fetch_sub(1, Ordering::Relaxed); diff --git a/iceoryx2-pal/concurrency-sync/src/lib.rs b/iceoryx2-pal/concurrency-sync/src/lib.rs index ea12bd325..d444448e2 100644 --- a/iceoryx2-pal/concurrency-sync/src/lib.rs +++ b/iceoryx2-pal/concurrency-sync/src/lib.rs @@ -19,3 +19,15 @@ pub mod condition_variable; pub mod mutex; pub mod rwlock; pub mod semaphore; + +#[derive(Debug, PartialEq, Eq)] +pub enum WaitAction { + Continue, + Abort, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum WaitResult { + Interrupted, + Success, +} diff --git a/iceoryx2-pal/concurrency-sync/src/mutex.rs b/iceoryx2-pal/concurrency-sync/src/mutex.rs index ce92953cf..20171a543 100644 --- a/iceoryx2-pal/concurrency-sync/src/mutex.rs +++ b/iceoryx2-pal/concurrency-sync/src/mutex.rs @@ -15,6 +15,8 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; +use crate::{WaitAction, WaitResult}; + pub struct Mutex { // we use an AtomicU32 since it should be supported on nearly every platform state: AtomicU32, @@ -33,24 +35,24 @@ impl Mutex { } } - pub fn lock bool>(&self, wait: Wait) -> bool { - if self.uncontested_lock(crate::SPIN_REPETITIONS) { - return true; + pub fn lock WaitAction>(&self, wait: Wait) -> WaitResult { + if self.uncontested_lock(crate::SPIN_REPETITIONS) == WaitResult::Success { + return WaitResult::Success; } loop { - let keep_running = wait(&self.state, &1); + let action = wait(&self.state, &1); if self .state .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) .is_ok() { - return true; + return WaitResult::Success; } - if !keep_running { - return false; + if action == WaitAction::Abort { + return WaitResult::Interrupted; } } } @@ -60,15 +62,21 @@ impl Mutex { wake_one(&self.state); } - pub fn try_lock(&self) -> bool { - self.state + pub fn try_lock(&self) -> WaitResult { + if self + .state .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - fn uncontested_lock(&self, retry_limit: u64) -> bool { - if self.try_lock() { - return true; + fn uncontested_lock(&self, retry_limit: u64) -> WaitResult { + if self.try_lock() == WaitResult::Success { + return WaitResult::Success; } let mut retry_counter = 0; @@ -81,10 +89,10 @@ impl Mutex { retry_counter += 1; if retry_limit == retry_counter { - return false; + return WaitResult::Interrupted; } } - true + WaitResult::Success } } diff --git a/iceoryx2-pal/concurrency-sync/src/rwlock.rs b/iceoryx2-pal/concurrency-sync/src/rwlock.rs index 4e74a434e..28c1a2bb9 100644 --- a/iceoryx2-pal/concurrency-sync/src/rwlock.rs +++ b/iceoryx2-pal/concurrency-sync/src/rwlock.rs @@ -15,7 +15,7 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; -use crate::SPIN_REPETITIONS; +use crate::{WaitAction, WaitResult, SPIN_REPETITIONS}; const WRITE_LOCKED: u32 = u32::MAX; const UNLOCKED: u32 = 0; @@ -37,14 +37,15 @@ impl RwLockReaderPreference { Self::default() } - pub fn try_read_lock(&self) -> bool { + pub fn try_read_lock(&self) -> WaitResult { let reader_count = self.reader_count.load(Ordering::Relaxed); if reader_count == WRITE_LOCKED { - return false; + return WaitResult::Interrupted; } - self.reader_count + if self + .reader_count .compare_exchange( reader_count, reader_count + 1, @@ -52,9 +53,14 @@ impl RwLockReaderPreference { Ordering::Relaxed, ) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn read_lock bool>(&self, wait: F) -> bool { + pub fn read_lock WaitAction>(&self, wait: F) -> WaitResult { let mut reader_count = self.reader_count.load(Ordering::Relaxed); let mut retry_counter = 0; @@ -66,13 +72,13 @@ impl RwLockReaderPreference { } if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.reader_count, &reader_count) { + } else if wait(&self.reader_count, &reader_count) == WaitAction::Abort { keep_running = false; } @@ -85,7 +91,7 @@ impl RwLockReaderPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => { reader_count = v; } @@ -103,13 +109,19 @@ impl RwLockReaderPreference { wake_one(&self.reader_count); } - pub fn try_write_lock(&self) -> bool { - self.reader_count + pub fn try_write_lock(&self) -> WaitResult { + if self + .reader_count .compare_exchange(UNLOCKED, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn write_lock bool>(&self, wait: F) -> bool { + pub fn write_lock WaitAction>(&self, wait: F) -> WaitResult { let mut retry_counter = 0; let mut reader_count; @@ -122,17 +134,17 @@ impl RwLockReaderPreference { Ordering::Relaxed, ) { Err(v) => reader_count = v, - Ok(_) => return true, + Ok(_) => return WaitResult::Success, }; if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.reader_count, &reader_count) { + } else if wait(&self.reader_count, &reader_count) == WaitAction::Abort { keep_running = false; } } @@ -158,18 +170,24 @@ impl RwLockWriterPreference { Self::default() } - pub fn try_read_lock(&self) -> bool { + pub fn try_read_lock(&self) -> WaitResult { let state = self.state.load(Ordering::Relaxed); if state % 2 == 1 { - return false; + return WaitResult::Interrupted; } - self.state + if self + .state .compare_exchange(state, state + 2, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } - pub fn read_lock bool>(&self, wait: F) -> bool { + pub fn read_lock WaitAction>(&self, wait: F) -> WaitResult { let mut state = self.state.load(Ordering::Relaxed); let mut retry_counter = 0; @@ -177,13 +195,13 @@ impl RwLockWriterPreference { loop { if state % 2 == 1 { if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { retry_counter += 1; spin_loop(); - } else if !wait(&self.state, &state) { + } else if wait(&self.state, &state) == WaitAction::Abort { keep_running = false; } state = self.state.load(Ordering::Relaxed); @@ -194,7 +212,7 @@ impl RwLockWriterPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => state = v, } } @@ -218,14 +236,20 @@ impl RwLockWriterPreference { } } - pub fn try_write_lock(&self) -> bool { - self.state + pub fn try_write_lock(&self) -> WaitResult { + if self + .state .compare_exchange(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_ok() + { + WaitResult::Success + } else { + WaitResult::Interrupted + } } pub fn write_lock< - Wait: Fn(&AtomicU32, &u32) -> bool, + Wait: Fn(&AtomicU32, &u32) -> WaitAction, WakeOne: Fn(&AtomicU32), WakeAll: Fn(&AtomicU32), >( @@ -233,7 +257,7 @@ impl RwLockWriterPreference { wait: Wait, wake_one: WakeOne, wake_all: WakeAll, - ) -> bool { + ) -> WaitResult { let mut state = self.state.load(Ordering::Relaxed); let mut keep_running = true; @@ -246,7 +270,7 @@ impl RwLockWriterPreference { Ordering::Acquire, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => state = v, } } @@ -264,12 +288,12 @@ impl RwLockWriterPreference { self.writer_wake_counter.fetch_add(1, Ordering::Relaxed); wake_one(&self.writer_wake_counter); wake_all(&self.state); - return false; + return WaitResult::Interrupted; } Err(v) => state = v, } } else { - return false; + return WaitResult::Interrupted; } } } @@ -288,7 +312,7 @@ impl RwLockWriterPreference { retry_counter += 1; } else { let writer_wake_counter = self.writer_wake_counter.load(Ordering::Relaxed); - if !wait(&self.writer_wake_counter, &writer_wake_counter) { + if wait(&self.writer_wake_counter, &writer_wake_counter) == WaitAction::Abort { keep_running = false; } } diff --git a/iceoryx2-pal/concurrency-sync/src/semaphore.rs b/iceoryx2-pal/concurrency-sync/src/semaphore.rs index 45072d3bd..b0a962cf5 100644 --- a/iceoryx2-pal/concurrency-sync/src/semaphore.rs +++ b/iceoryx2-pal/concurrency-sync/src/semaphore.rs @@ -15,7 +15,7 @@ use core::{ sync::atomic::{AtomicU32, Ordering}, }; -use crate::SPIN_REPETITIONS; +use crate::{WaitAction, WaitResult, SPIN_REPETITIONS}; pub struct Semaphore { value: AtomicU32, @@ -37,7 +37,7 @@ impl Semaphore { wakeup(&self.value); } - pub fn wait bool>(&self, wait: Wait) -> bool { + pub fn wait WaitAction>(&self, wait: Wait) -> WaitResult { let mut retry_counter = 0; let mut current_value = self.value.load(Ordering::Relaxed); @@ -49,13 +49,13 @@ impl Semaphore { } if !keep_running { - return false; + return WaitResult::Interrupted; } if retry_counter < SPIN_REPETITIONS { spin_loop(); retry_counter += 1; - } else if !wait(&self.value, ¤t_value) { + } else if wait(&self.value, ¤t_value) == WaitAction::Abort { keep_running = false; } current_value = self.value.load(Ordering::Relaxed); @@ -72,15 +72,15 @@ impl Semaphore { } } - true + WaitResult::Success } - pub fn try_wait(&self) -> bool { + pub fn try_wait(&self) -> WaitResult { let mut current_value = self.value.load(Ordering::Relaxed); loop { if current_value == 0 { - return false; + return WaitResult::Interrupted; } match self.value.compare_exchange_weak( @@ -89,7 +89,7 @@ impl Semaphore { Ordering::Release, Ordering::Relaxed, ) { - Ok(_) => return true, + Ok(_) => return WaitResult::Success, Err(v) => current_value = v, } } diff --git a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs index c83201e4c..40694a165 100644 --- a/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/condition_variable_tests.rs @@ -17,7 +17,9 @@ use std::{ }; use iceoryx2_bb_testing::{assert_that, watchdog::Watchdog}; -use iceoryx2_pal_concurrency_sync::{barrier::Barrier, condition_variable::*}; +use iceoryx2_pal_concurrency_sync::{ + barrier::Barrier, condition_variable::*, WaitAction, WaitResult, +}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -75,7 +77,7 @@ fn condition_variable_notify_one_unblocks_one() { for _ in 0..NUMBER_OF_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -85,13 +87,13 @@ fn condition_variable_notify_one_unblocks_one() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); + assert_that!(wait_result, eq WaitResult::Success); }); } @@ -129,7 +131,7 @@ fn condition_variable_notify_all_unblocks_all() { for _ in 0..NUMBER_OF_THREADS { threads.push(s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -139,13 +141,13 @@ fn condition_variable_notify_all_unblocks_all() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); mtx.unlock(|_| {}); - assert_that!(wait_result, eq true); + assert_that!(wait_result, eq WaitResult::Success); })); } @@ -183,7 +185,7 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { for _ in 0..NUMBER_OF_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - mtx.lock(|_, _| true); + mtx.lock(|_, _| WaitAction::Continue); let id = thread_in_wait.get_id(); let wait_result = sut.wait( &mtx, @@ -193,13 +195,13 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { while triggered_thread.load(Ordering::Relaxed) < 1 { spin_loop() } - false + WaitAction::Continue }, - |_, _| true, + |_, _| WaitAction::Continue, ); counter.fetch_add(1, Ordering::Relaxed); - assert_that!(wait_result, eq true); - assert_that!(mtx.try_lock(), eq false); + assert_that!(wait_result, eq WaitResult::Success); + assert_that!(mtx.try_lock(), eq WaitResult::Interrupted); // unlock thread since we own it mtx.unlock(|_| {}); }); @@ -224,7 +226,7 @@ fn condition_variable_mutex_is_locked_when_wait_returns() { fn condition_variable_wait_returns_false_when_functor_returns_false() { let sut = ConditionVariable::new(); let mtx = Mutex::new(); - mtx.lock(|_, _| true); - assert_that!(!sut.wait(&mtx, |_| {}, |_, _| false, |_, _| true), eq true); + mtx.lock(|_, _| WaitAction::Continue); + assert_that!(sut.wait(&mtx, |_| {}, |_, _| WaitAction::Abort, |_, _| WaitAction::Continue), eq WaitResult::Interrupted); mtx.unlock(|_| {}); } diff --git a/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs b/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs index fc76c5b3c..28f327d35 100644 --- a/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/mutex_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::mutex::*; +use iceoryx2_pal_concurrency_sync::{mutex::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -29,7 +29,7 @@ fn mutex_lock_blocks() { sut.try_lock(); let t1 = s.spawn(|| { - sut.lock(|_, _| true); + sut.lock(|_, _| WaitAction::Continue); counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); }); @@ -57,15 +57,15 @@ fn mutex_lock_with_timeout_blocks() { let start = Instant::now(); while atomic.load(Ordering::Relaxed) == *value { if start.elapsed() > TIMEOUT * 2 { - return false; + return WaitAction::Abort; } } - true + WaitAction::Continue }); counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); - assert_that!(lock_result, eq true); + assert_that!(lock_result, eq WaitResult::Success); }); std::thread::sleep(TIMEOUT); @@ -84,14 +84,14 @@ fn mutex_lock_with_timeout_and_fails_after_timeout() { sut.try_lock(); - assert_that!(!sut.lock(|atomic, value| { + assert_that!(sut.lock(|atomic, value| { let start = Instant::now(); while atomic.load(Ordering::Relaxed) == *value { if start.elapsed() > TIMEOUT { - return false; + return WaitAction::Abort; } } - true - }), eq true); + WaitAction::Continue + }), eq WaitResult::Interrupted); } diff --git a/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs b/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs index 9564728ab..e3d5de82f 100644 --- a/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/rwlock_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::{barrier::Barrier, rwlock::*}; +use iceoryx2_pal_concurrency_sync::{barrier::Barrier, rwlock::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -24,48 +24,48 @@ const TIMEOUT: Duration = Duration::from_millis(25); fn rwlock_reader_preference_try_write_lock_blocks_read_locks() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.try_write_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_reader_preference_multiple_read_locks_block_write_lock() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_reader_preference_write_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}); - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); - assert_that!(!sut.write_lock(|_, _| false), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}); - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); } #[test] @@ -74,16 +74,16 @@ fn rwlock_reader_preference_try_read_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.try_read_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}); } - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); } #[test] @@ -92,16 +92,16 @@ fn rwlock_reader_preference_read_lock_and_unlock_works() { let sut = RwLockReaderPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false), eq true); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}); } - assert_that!(sut.write_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); } #[test] @@ -118,11 +118,11 @@ fn rwlock_reader_preference_read_lock_blocks_only_write_locks() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_read_lock(), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true); + sut.write_lock(|_, _| WaitAction::Continue); write_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); barrier_write.wait(|_, _| {}, |_| {}); @@ -132,7 +132,7 @@ fn rwlock_reader_preference_read_lock_blocks_only_write_locks() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); barrier_read.wait(|_, _| {}, |_| {}); sut.unlock(|_| {}); @@ -171,11 +171,11 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true); + sut.write_lock(|_, _| WaitAction::Continue); let current_read_counter = read_counter.load(Ordering::Relaxed); write_counter.fetch_add(1, Ordering::Relaxed); std::thread::sleep(TIMEOUT); @@ -190,7 +190,7 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}); @@ -226,48 +226,48 @@ fn rwlock_reader_preference_write_lock_blocks_everything() { fn rwlock_writer_preference_try_write_lock_blocks_read_locks() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.try_write_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] fn rwlock_writer_preference_multiple_read_locks_block_write_lock() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.try_read_lock(), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } #[test] fn rwlock_writer_preference_write_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}, |_| {}); - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); - assert_that!(!sut.try_read_lock(), eq true); - assert_that!(!sut.read_lock(|_, _| false), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); + assert_that!(sut.try_read_lock(), eq WaitResult::Interrupted); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.unlock(|_| {}, |_| {}); - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); } #[test] @@ -276,16 +276,16 @@ fn rwlock_writer_preference_try_read_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.try_read_lock(), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.try_read_lock(), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}, |_| {}); } - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); } #[test] @@ -294,16 +294,16 @@ fn rwlock_writer_preference_read_lock_and_unlock_works() { let sut = RwLockWriterPreference::new(); for _ in 0..NUMBER_OF_READ_LOCKS { - assert_that!(sut.read_lock(|_, _| false), eq true); - assert_that!(!sut.try_write_lock(), eq true); - assert_that!(!sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.read_lock(|_, _| WaitAction::Abort), eq WaitResult::Success); + assert_that!(sut.try_write_lock(), eq WaitResult::Interrupted); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Interrupted); } for _ in 0..NUMBER_OF_READ_LOCKS { sut.unlock(|_| {}, |_| {}); } - assert_that!(sut.write_lock(|_, _| false, |_| {}, |_| {}), eq true); + assert_that!(sut.write_lock(|_, _| WaitAction::Abort, |_| {}, |_| {}), eq WaitResult::Success); } #[test] @@ -319,11 +319,11 @@ fn rwlock_writer_preference_write_lock_blocks_everything() { let write_counter = AtomicU32::new(0); std::thread::scope(|s| { - assert_that!(sut.try_write_lock(), eq true); + assert_that!(sut.try_write_lock(), eq WaitResult::Success); for _ in 0..WRITE_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.write_lock(|_, _| true, |_| {}, |_| {}); + sut.write_lock(|_, _| WaitAction::Continue, |_| {}, |_| {}); let current_read_counter = read_counter.load(Ordering::Relaxed); write_counter.fetch_add(1, Ordering::Relaxed); std::thread::sleep(TIMEOUT); @@ -338,7 +338,7 @@ fn rwlock_writer_preference_write_lock_blocks_everything() { for _ in 0..READ_THREADS { s.spawn(|| { barrier.wait(|_, _| {}, |_| {}); - sut.read_lock(|_, _| true); + sut.read_lock(|_, _| WaitAction::Continue); read_counter.fetch_add(1, Ordering::Relaxed); sut.unlock(|_| {}, |_| {}); diff --git a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs index b02551389..97ca08425 100644 --- a/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs +++ b/iceoryx2-pal/concurrency-sync/tests/semaphore_tests.rs @@ -16,7 +16,7 @@ use std::{ }; use iceoryx2_bb_testing::assert_that; -use iceoryx2_pal_concurrency_sync::semaphore::*; +use iceoryx2_pal_concurrency_sync::{semaphore::*, WaitAction, WaitResult}; const TIMEOUT: Duration = Duration::from_millis(25); @@ -26,16 +26,16 @@ fn semaphore_post_and_try_wait_works() { let sut = Semaphore::new(initial_value); for _ in 0..initial_value { - assert_that!(sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Success); } - assert_that!(!sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Interrupted); sut.post(|_| {}, initial_value); for _ in 0..initial_value { - assert_that!(sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Success); } - assert_that!(!sut.try_wait(), eq true); + assert_that!(sut.try_wait(), eq WaitResult::Interrupted); } #[test] @@ -44,16 +44,16 @@ fn semaphore_post_and_wait_works() { let sut = Semaphore::new(initial_value); for _ in 0..initial_value { - assert_that!(sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Success); } - assert_that!(!sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); sut.post(|_| {}, initial_value); for _ in 0..initial_value { - assert_that!(sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Success); } - assert_that!(!sut.wait(|_, _| false), eq true); + assert_that!(sut.wait(|_, _| WaitAction::Abort), eq WaitResult::Interrupted); } #[test] @@ -64,7 +64,7 @@ fn semaphore_wait_blocks() { std::thread::scope(|s| { s.spawn(|| { - sut.wait(|_, _| true); + sut.wait(|_, _| WaitAction::Continue); counter.fetch_add(1, Ordering::Relaxed); });