From 4820cece7c54d824db4c6def5aa6323216dc07ed Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 11 Feb 2024 11:15:42 -0800 Subject: [PATCH 1/5] feat: Add an overflow push method In some cases it is desired to have a "lossy" queue for data. Such as an event queue where more recent events should be prioritized over older ones, where infinite storage is impractical. This commit adds a method called "force_push" which enables this usage. Bounded queue code is partially derived from the following commit: https://github.com/crossbeam-rs/crossbeam/commit/bd75c3c45edb78a731956c01458b75e5b69a8146 cc smol-rs/async-channel#44 Signed-off-by: John Nunley --- src/bounded.rs | 90 +++++++++++++++++++++++++++++++++++++--------- src/lib.rs | 74 ++++++++++++++++++++++++++++++++++++++ src/single.rs | 52 ++++++++++++++++++++++++++- src/sync.rs | 11 ------ tests/single.rs | 17 ++++++++- tests/unbounded.rs | 28 ++++++++++++++- 6 files changed, 241 insertions(+), 31 deletions(-) diff --git a/src/bounded.rs b/src/bounded.rs index 8e0b9f4..b6ace3d 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -7,7 +7,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::cell::UnsafeCell; #[allow(unused_imports)] use crate::sync::prelude::*; -use crate::{busy_wait, PopError, PushError}; +use crate::{busy_wait, PopError, PushError, ForcePushError}; /// A slot in a queue. struct Slot { @@ -83,6 +83,65 @@ impl Bounded { /// Attempts to push an item into the queue. pub fn push(&self, value: T) -> Result<(), PushError> { + self.push_or_else(value, |value, tail, _, _| { + let head = self.head.load(Ordering::Relaxed); + + // If the head lags one lap behind the tail as well... + if head.wrapping_add(self.one_lap) == tail { + // ...then the queue is full. + Err(PushError::Full(value)) + } else { + Ok(value) + } + }) + } + + /// Pushes an item into the queue, displacing another item if needed. + pub fn force_push(&self, value: T) -> Result, ForcePushError> { + let result = self.push_or_else(value, |value, tail, new_tail, slot| { + let head = tail.wrapping_sub(self.one_lap); + let new_head = new_tail.wrapping_sub(self.one_lap); + + // Try to move the head. + if self + .head + .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) + .is_ok() + { + // Move the tail. + self.tail.store(new_tail, Ordering::SeqCst); + + // Swap out the old value. + // SAFETY: We know this is initialized, since it's covered by the current queue. + let old = unsafe { + slot.value + .get() + .replace(MaybeUninit::new(value)) + .assume_init() + }; + + // Update the stamp. + slot.stamp.store(tail + 1, Ordering::Release); + + // Return a PushError. + Err(PushError::Full(old)) + } else { + Ok(value) + } + }); + + match result { + Ok(()) => Ok(None), + Err(PushError::Full(old_value)) => Ok(Some(old_value)), + Err(PushError::Closed(value)) => Err(ForcePushError(value)), + } + } + + /// Attempts to push an item into the queue, running a closure on failure. + fn push_or_else(&self, mut value: T, mut fail: F) -> Result<(), PushError> + where + F: FnMut(T, usize, usize, &Slot) -> Result>, + { let mut tail = self.tail.load(Ordering::Relaxed); loop { @@ -95,22 +154,23 @@ impl Bounded { let index = tail & (self.mark_bit - 1); let lap = tail & !(self.one_lap - 1); + // Calculate the new location of the tail. + let new_tail = if index + 1 < self.buffer.len() { + // Same lap, incremented index. + // Set to `{ lap: lap, mark: 0, index: index + 1 }`. + tail + 1 + } else { + // One lap forward, index wraps around to zero. + // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. + lap.wrapping_add(self.one_lap) + }; + // Inspect the corresponding slot. let slot = &self.buffer[index]; let stamp = slot.stamp.load(Ordering::Acquire); // If the tail and the stamp match, we may attempt to push. if tail == stamp { - let new_tail = if index + 1 < self.buffer.len() { - // Same lap, incremented index. - // Set to `{ lap: lap, mark: 0, index: index + 1 }`. - tail + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. - lap.wrapping_add(self.one_lap) - }; - // Try moving the tail. match self.tail.compare_exchange_weak( tail, @@ -132,13 +192,9 @@ impl Bounded { } } else if stamp.wrapping_add(self.one_lap) == tail + 1 { crate::full_fence(); - let head = self.head.load(Ordering::Relaxed); - // If the head lags one lap behind the tail as well... - if head.wrapping_add(self.one_lap) == tail { - // ...then the queue is full. - return Err(PushError::Full(value)); - } + // We've failed to push; run our failure closure. + value = fail(value, tail, new_tail, slot)?; // Loom complains if there isn't an explicit busy wait here. #[cfg(loom)] diff --git a/src/lib.rs b/src/lib.rs index f836d17..9ed6095 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,6 +181,54 @@ impl ConcurrentQueue { } } + /// Push an element into the queue, potentially displacing another element. + /// + /// Attempts to push an element into the queue. If the queue is full, one item from the + /// queue is replaced with the provided item. The displaced item is returned as `Some(T)`. + /// If the queue is closed, an error is returned. + /// + /// # Examples + /// + /// ``` + /// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError}; + /// + /// let q = ConcurrentQueue::bounded(3); + /// + /// // We can push to the queue. + /// for i in 1..=3 { + /// assert_eq!(q.force_push(i), Ok(None)); + /// } + /// + /// // Push errors because the queue is now full. + /// assert_eq!(q.push(4), Err(PushError::Full(4))); + /// + /// // Pushing a new value replaces the old ones. + /// assert_eq!(q.force_push(5), Ok(Some(1))); + /// assert_eq!(q.force_push(6), Ok(Some(2))); + /// + /// // Close the queue to stop further pushes. + /// q.close(); + /// + /// // Pushing will return an error. + /// assert_eq!(q.force_push(7), Err(ForcePushError(7))); + /// + /// // Popping items will return the force-pushed ones. + /// assert_eq!(q.pop(), Ok(3)); + /// assert_eq!(q.pop(), Ok(5)); + /// assert_eq!(q.pop(), Ok(6)); + /// ``` + pub fn force_push(&self, value: T) -> Result, ForcePushError> { + match &self.0 { + Inner::Single(q) => q.force_push(value), + Inner::Bounded(q) => q.force_push(value), + Inner::Unbounded(q) => match q.push(value) { + Ok(()) => Ok(None), + Err(PushError::Closed(value)) => Err(ForcePushError(value)), + Err(PushError::Full(_)) => unreachable!() + } + } + } + /// Attempts to pop an item from the queue. /// /// If the queue is empty, an error is returned. @@ -532,6 +580,32 @@ impl fmt::Display for PushError { } } +/// Error that occurs when force-pushing into a full queue. +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct ForcePushError(pub T); + +impl ForcePushError { + /// Return the inner value that failed to be force-pushed. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Debug for ForcePushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("ForcePushError").field(&self.0).finish() + } +} + +impl fmt::Display for ForcePushError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Closed") + } +} + +#[cfg(feature = "std")] +impl error::Error for ForcePushError {} + /// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. #[inline] fn full_fence() { diff --git a/src/single.rs b/src/single.rs index 0efa0b7..7163601 100644 --- a/src/single.rs +++ b/src/single.rs @@ -1,10 +1,11 @@ use core::mem::MaybeUninit; +use core::ptr; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::cell::UnsafeCell; #[allow(unused_imports)] use crate::sync::prelude::*; -use crate::{busy_wait, PopError, PushError}; +use crate::{busy_wait, PopError, PushError, ForcePushError}; const LOCKED: usize = 1 << 0; const PUSHED: usize = 1 << 1; @@ -47,6 +48,55 @@ impl Single { } } + /// Attempts to push an item into the queue, displacing another if necessary. + pub fn force_push(&self, value: T) -> Result, ForcePushError> { + // Attempt to lock the slot. + let mut state = 0; + + loop { + // Lock the slot. + let prev = self + .state + .compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|x| x); + + if prev & CLOSED != 0 { + return Err(ForcePushError(value)); + } + + if prev == state { + // Swap out the value. + // SAFETY: We have locked the state. + let prev_value = unsafe { + self.slot + .with_mut(move |slot| ptr::replace(slot, MaybeUninit::new(value))) + }; + + // We can unlock the slot now. + self.state.fetch_and(!LOCKED, Ordering::Release); + + // If the value was pushed, initialize it and return it. + let prev_value = if prev & PUSHED == 0 { + None + } else { + Some(unsafe { prev_value.assume_init() }) + }; + + // Unlock the slot. + return Ok(prev_value); + } + + // Try to go for the current (pushed) state. + if prev & LOCKED == 0 { + state = prev; + } else { + // State is locked. + busy_wait(); + state = prev & !LOCKED; + } + } + } + /// Attempts to pop an item from the queue. pub fn pop(&self) -> Result { let mut state = PUSHED; diff --git a/src/sync.rs b/src/sync.rs index 53238d0..d6c4f0a 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -62,10 +62,6 @@ pub(crate) mod prelude { pub(crate) trait UnsafeCellExt { type Value; - fn with(&self, f: F) -> R - where - F: FnOnce(*const Self::Value) -> R; - fn with_mut(&self, f: F) -> R where F: FnOnce(*mut Self::Value) -> R; @@ -74,13 +70,6 @@ pub(crate) mod prelude { impl UnsafeCellExt for cell::UnsafeCell { type Value = T; - fn with(&self, f: F) -> R - where - F: FnOnce(*const Self::Value) -> R, - { - f(self.get()) - } - fn with_mut(&self, f: F) -> R where F: FnOnce(*mut Self::Value) -> R, diff --git a/tests/single.rs b/tests/single.rs index 4dc1182..8d2a0d6 100644 --- a/tests/single.rs +++ b/tests/single.rs @@ -1,6 +1,6 @@ #![allow(clippy::bool_assert_comparison)] -use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError}; #[cfg(not(target_family = "wasm"))] use easy_parallel::Parallel; @@ -65,6 +65,21 @@ fn close() { assert_eq!(q.pop(), Err(PopError::Closed)); } +#[test] +fn force_push() { + let q = ConcurrentQueue::::bounded(1); + assert_eq!(q.force_push(10), Ok(None)); + + assert!(!q.is_closed()); + assert_eq!(q.force_push(20), Ok(Some(10))); + assert_eq!(q.force_push(30), Ok(Some(20))); + + assert!(q.close()); + assert_eq!(q.force_push(40), Err(ForcePushError(40))); + assert_eq!(q.pop(), Ok(30)); + assert_eq!(q.pop(), Err(PopError::Closed)); +} + #[cfg(not(target_family = "wasm"))] #[test] fn spsc() { diff --git a/tests/unbounded.rs b/tests/unbounded.rs index e95dc8c..6b211ab 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -1,6 +1,6 @@ #![allow(clippy::bool_assert_comparison)] -use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use concurrent_queue::{ConcurrentQueue, PopError, PushError, ForcePushError}; #[cfg(not(target_family = "wasm"))] use easy_parallel::Parallel; @@ -74,6 +74,32 @@ fn close() { assert_eq!(q.pop(), Err(PopError::Closed)); } +#[test] +fn force_push() { + let q = ConcurrentQueue::::bounded(5); + + for i in 1..=5 { + assert_eq!(q.force_push(i), Ok(None)); + } + + assert!(!q.is_closed()); + for i in 6..=10 { + assert_eq!(q.force_push(i), Ok(Some(i - 5))); + } + assert_eq!(q.pop(), Ok(6)); + assert_eq!(q.force_push(11), Ok(None)); + for i in 12..=15 { + assert_eq!(q.force_push(i), Ok(Some(i - 5))); + } + + assert!(q.close()); + assert_eq!(q.force_push(40), Err(ForcePushError(40))); + for i in 11..=15 { + assert_eq!(q.pop(), Ok(i)); + } + assert_eq!(q.pop(), Err(PopError::Closed)); +} + #[cfg(not(target_family = "wasm"))] #[test] fn spsc() { From 898ff46ac5bebbfbdef616a222dde4d70b48d2ec Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 11 Feb 2024 11:25:39 -0800 Subject: [PATCH 2/5] chore: Fix CI errors Signed-off-by: John Nunley --- src/bounded.rs | 6 ++---- src/lib.rs | 24 ++++++++++++------------ src/single.rs | 2 +- tests/unbounded.rs | 2 +- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/bounded.rs b/src/bounded.rs index b6ace3d..d7c831e 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -7,7 +7,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::cell::UnsafeCell; #[allow(unused_imports)] use crate::sync::prelude::*; -use crate::{busy_wait, PopError, PushError, ForcePushError}; +use crate::{busy_wait, ForcePushError, PopError, PushError}; /// A slot in a queue. struct Slot { @@ -115,9 +115,7 @@ impl Bounded { // SAFETY: We know this is initialized, since it's covered by the current queue. let old = unsafe { slot.value - .get() - .replace(MaybeUninit::new(value)) - .assume_init() + .with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init()) }; // Update the stamp. diff --git a/src/lib.rs b/src/lib.rs index 9ed6095..8b495ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -182,36 +182,36 @@ impl ConcurrentQueue { } /// Push an element into the queue, potentially displacing another element. - /// + /// /// Attempts to push an element into the queue. If the queue is full, one item from the /// queue is replaced with the provided item. The displaced item is returned as `Some(T)`. /// If the queue is closed, an error is returned. - /// + /// /// # Examples - /// + /// /// ``` /// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError}; - /// + /// /// let q = ConcurrentQueue::bounded(3); - /// + /// /// // We can push to the queue. /// for i in 1..=3 { /// assert_eq!(q.force_push(i), Ok(None)); /// } - /// + /// /// // Push errors because the queue is now full. /// assert_eq!(q.push(4), Err(PushError::Full(4))); - /// + /// /// // Pushing a new value replaces the old ones. /// assert_eq!(q.force_push(5), Ok(Some(1))); /// assert_eq!(q.force_push(6), Ok(Some(2))); - /// + /// /// // Close the queue to stop further pushes. /// q.close(); - /// + /// /// // Pushing will return an error. /// assert_eq!(q.force_push(7), Err(ForcePushError(7))); - /// + /// /// // Popping items will return the force-pushed ones. /// assert_eq!(q.pop(), Ok(3)); /// assert_eq!(q.pop(), Ok(5)); @@ -224,8 +224,8 @@ impl ConcurrentQueue { Inner::Unbounded(q) => match q.push(value) { Ok(()) => Ok(None), Err(PushError::Closed(value)) => Err(ForcePushError(value)), - Err(PushError::Full(_)) => unreachable!() - } + Err(PushError::Full(_)) => unreachable!(), + }, } } diff --git a/src/single.rs b/src/single.rs index 7163601..d699f05 100644 --- a/src/single.rs +++ b/src/single.rs @@ -5,7 +5,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::cell::UnsafeCell; #[allow(unused_imports)] use crate::sync::prelude::*; -use crate::{busy_wait, PopError, PushError, ForcePushError}; +use crate::{busy_wait, ForcePushError, PopError, PushError}; const LOCKED: usize = 1 << 0; const PUSHED: usize = 1 << 1; diff --git a/tests/unbounded.rs b/tests/unbounded.rs index 6b211ab..53ced9a 100644 --- a/tests/unbounded.rs +++ b/tests/unbounded.rs @@ -1,6 +1,6 @@ #![allow(clippy::bool_assert_comparison)] -use concurrent_queue::{ConcurrentQueue, PopError, PushError, ForcePushError}; +use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError}; #[cfg(not(target_family = "wasm"))] use easy_parallel::Parallel; From a12fcbd24530b5192bd2badd7e5aebc4aab2fdd8 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 23 Mar 2024 09:03:06 -0700 Subject: [PATCH 3/5] docs: Add more documentation to push_or_else Signed-off-by: John Nunley --- src/bounded.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/bounded.rs b/src/bounded.rs index d7c831e..72bcb03 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -136,6 +136,17 @@ impl Bounded { } /// Attempts to push an item into the queue, running a closure on failure. + /// + /// `fail` is run when there is no more room left in the tail of the queue. The parameters of + /// this function are as follows: + /// + /// - The item that failed to push. + /// - The value of `self.tail` before the new value would be inserted. + /// - The value of `self.tail` after the new value would be inserted. + /// - The slot that we attempted to push into. + /// + /// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise, + /// this function will return the error. fn push_or_else(&self, mut value: T, mut fail: F) -> Result<(), PushError> where F: FnMut(T, usize, usize, &Slot) -> Result>, From 7e8b7852e009242705982aeae97d16449a3d9947 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 23 Mar 2024 12:26:51 -0700 Subject: [PATCH 4/5] chore: fmt Signed-off-by: John Nunley --- src/bounded.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bounded.rs b/src/bounded.rs index 72bcb03..dab3a29 100644 --- a/src/bounded.rs +++ b/src/bounded.rs @@ -136,15 +136,15 @@ impl Bounded { } /// Attempts to push an item into the queue, running a closure on failure. - /// + /// /// `fail` is run when there is no more room left in the tail of the queue. The parameters of /// this function are as follows: - /// + /// /// - The item that failed to push. /// - The value of `self.tail` before the new value would be inserted. /// - The value of `self.tail` after the new value would be inserted. /// - The slot that we attempted to push into. - /// + /// /// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise, /// this function will return the error. fn push_or_else(&self, mut value: T, mut fail: F) -> Result<(), PushError> From 5a0de426443e4db71a9e629d4dbede2d7d2627b7 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 29 Mar 2024 20:45:22 -0700 Subject: [PATCH 5/5] docs: Fix comment Signed-off-by: John Nunley --- src/single.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/single.rs b/src/single.rs index d699f05..048f486 100644 --- a/src/single.rs +++ b/src/single.rs @@ -82,7 +82,7 @@ impl Single { Some(unsafe { prev_value.assume_init() }) }; - // Unlock the slot. + // Return the old value. return Ok(prev_value); }