Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an overflow push method #58

Merged
merged 5 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 71 additions & 17 deletions src/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
use crate::{busy_wait, ForcePushError, PopError, PushError};

/// A slot in a queue.
struct Slot<T> {
Expand Down Expand Up @@ -83,6 +83,63 @@ impl<T> Bounded<T> {

/// Attempts to push an item into the queue.
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
self.push_or_else(value, |value, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(PushError::Full(value))
} else {
Ok(value)
}
})
}

/// Pushes an item into the queue, displacing another item if needed.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
let result = self.push_or_else(value, |value, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);

// Try to move the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);

// Swap out the old value.
// SAFETY: We know this is initialized, since it's covered by the current queue.
let old = unsafe {
slot.value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll ask for a small clarification: why is the slot the correct one to put the new value inside? (in contrast to a slot derived from the new_tail)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slot is the old tail, which means it's the last unfilled slot.

.with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init())
};

// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);

// Return a PushError.
Err(PushError::Full(old))
} else {
Ok(value)
}
});

match result {
Ok(()) => Ok(None),
Err(PushError::Full(old_value)) => Ok(Some(old_value)),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
}
}

/// Attempts to push an item into the queue, running a closure on failure.
fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>>
notgull marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>,
{
let mut tail = self.tail.load(Ordering::Relaxed);

loop {
Expand All @@ -95,22 +152,23 @@ impl<T> Bounded<T> {
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);

// Calculate the new location of the tail.
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Inspect the corresponding slot.
let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire);

// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.buffer.len() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
Expand All @@ -132,13 +190,9 @@ impl<T> Bounded<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this work with tail + 1 and not use something like what's done for new_tail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in order to tell if the stamp has advanced to where the tail would logically be, as opposed to where the tail actually is.

I'm not 100% clear on the logic here; this was written long before I joined smol-rs, and I think it was originally part of crossbeam.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it mght be a good idea to maybe investigate and properly document this sometime (not even necessarily inside of the code, a separate tex document with diagrams of this might be a good idea). Maybe I find time to do that (but unlikely in the next few months because I need to time to write a Bachelor thesis).

crate::full_fence();
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(PushError::Full(value));
}
// We've failed to push; run our failure closure.
value = fail(value, tail, new_tail, slot)?;

// Loom complains if there isn't an explicit busy wait here.
#[cfg(loom)]
Expand Down
74 changes: 74 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,54 @@ impl<T> ConcurrentQueue<T> {
}
}

/// Push an element into the queue, potentially displacing another element.
///
/// Attempts to push an element into the queue. If the queue is full, one item from the
/// queue is replaced with the provided item. The displaced item is returned as `Some(T)`.
/// If the queue is closed, an error is returned.
///
/// # Examples
///
/// ```
/// use concurrent_queue::{ConcurrentQueue, ForcePushError, PushError};
///
/// let q = ConcurrentQueue::bounded(3);
///
/// // We can push to the queue.
/// for i in 1..=3 {
/// assert_eq!(q.force_push(i), Ok(None));
/// }
///
/// // Push errors because the queue is now full.
/// assert_eq!(q.push(4), Err(PushError::Full(4)));
///
/// // Pushing a new value replaces the old ones.
/// assert_eq!(q.force_push(5), Ok(Some(1)));
/// assert_eq!(q.force_push(6), Ok(Some(2)));
///
/// // Close the queue to stop further pushes.
/// q.close();
///
/// // Pushing will return an error.
/// assert_eq!(q.force_push(7), Err(ForcePushError(7)));
///
/// // Popping items will return the force-pushed ones.
/// assert_eq!(q.pop(), Ok(3));
/// assert_eq!(q.pop(), Ok(5));
/// assert_eq!(q.pop(), Ok(6));
/// ```
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
match &self.0 {
Inner::Single(q) => q.force_push(value),
Inner::Bounded(q) => q.force_push(value),
Inner::Unbounded(q) => match q.push(value) {
Ok(()) => Ok(None),
Err(PushError::Closed(value)) => Err(ForcePushError(value)),
Err(PushError::Full(_)) => unreachable!(),
},
}
}

/// Attempts to pop an item from the queue.
///
/// If the queue is empty, an error is returned.
Expand Down Expand Up @@ -532,6 +580,32 @@ impl<T> fmt::Display for PushError<T> {
}
}

/// Error that occurs when force-pushing into a full queue.
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct ForcePushError<T>(pub T);

impl<T> ForcePushError<T> {
/// Return the inner value that failed to be force-pushed.
pub fn into_inner(self) -> T {
self.0
}
}

impl<T: fmt::Debug> fmt::Debug for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ForcePushError").field(&self.0).finish()
}
}

impl<T> fmt::Display for ForcePushError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Closed")
}
}

#[cfg(feature = "std")]
impl<T: fmt::Debug> error::Error for ForcePushError<T> {}

/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
Expand Down
52 changes: 51 additions & 1 deletion src/single.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use core::mem::MaybeUninit;
use core::ptr;

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};
use crate::{busy_wait, ForcePushError, PopError, PushError};

const LOCKED: usize = 1 << 0;
const PUSHED: usize = 1 << 1;
Expand Down Expand Up @@ -47,6 +48,55 @@ impl<T> Single<T> {
}
}

/// Attempts to push an item into the queue, displacing another if necessary.
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> {
// Attempt to lock the slot.
let mut state = 0;

loop {
// Lock the slot.
let prev = self
.state
.compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst)
.unwrap_or_else(|x| x);

if prev & CLOSED != 0 {
return Err(ForcePushError(value));
}

if prev == state {
// Swap out the value.
// SAFETY: We have locked the state.
let prev_value = unsafe {
self.slot
.with_mut(move |slot| ptr::replace(slot, MaybeUninit::new(value)))
};

// We can unlock the slot now.
self.state.fetch_and(!LOCKED, Ordering::Release);

// If the value was pushed, initialize it and return it.
let prev_value = if prev & PUSHED == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think by processing this before the fetch_and, the cost of moving the previous value when PUSHED == 0 can be avoided. (The compiler may be able to handle it well in release mode, but at least it would be useful in debug mode when the element is costly to move, such as large arrays.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #71

None
} else {
Some(unsafe { prev_value.assume_init() })
};

// Unlock the slot.
notgull marked this conversation as resolved.
Show resolved Hide resolved
return Ok(prev_value);
}

// Try to go for the current (pushed) state.
if prev & LOCKED == 0 {
state = prev;
} else {
// State is locked.
busy_wait();
state = prev & !LOCKED;
}
}
}

/// Attempts to pop an item from the queue.
pub fn pop(&self) -> Result<T, PopError> {
let mut state = PUSHED;
Expand Down
11 changes: 0 additions & 11 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ pub(crate) mod prelude {
pub(crate) trait UnsafeCellExt {
type Value;

fn with<R, F>(&self, f: F) -> R
where
F: FnOnce(*const Self::Value) -> R;

fn with_mut<R, F>(&self, f: F) -> R
where
F: FnOnce(*mut Self::Value) -> R;
Expand All @@ -74,13 +70,6 @@ pub(crate) mod prelude {
impl<T> UnsafeCellExt for cell::UnsafeCell<T> {
type Value = T;

fn with<R, F>(&self, f: F) -> R
where
F: FnOnce(*const Self::Value) -> R,
{
f(self.get())
}

fn with_mut<R, F>(&self, f: F) -> R
where
F: FnOnce(*mut Self::Value) -> R,
Expand Down
17 changes: 16 additions & 1 deletion tests/single.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::bool_assert_comparison)]

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};

#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
Expand Down Expand Up @@ -65,6 +65,21 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(1);
assert_eq!(q.force_push(10), Ok(None));

assert!(!q.is_closed());
assert_eq!(q.force_push(20), Ok(Some(10)));
assert_eq!(q.force_push(30), Ok(Some(20)));

assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
assert_eq!(q.pop(), Ok(30));
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
Expand Down
28 changes: 27 additions & 1 deletion tests/unbounded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(clippy::bool_assert_comparison)]

use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};

#[cfg(not(target_family = "wasm"))]
use easy_parallel::Parallel;
Expand Down Expand Up @@ -74,6 +74,32 @@ fn close() {
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[test]
fn force_push() {
let q = ConcurrentQueue::<i32>::bounded(5);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test for bounded queue, but the test file is tests/unbounded.rs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #68


for i in 1..=5 {
assert_eq!(q.force_push(i), Ok(None));
}

assert!(!q.is_closed());
for i in 6..=10 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}
assert_eq!(q.pop(), Ok(6));
assert_eq!(q.force_push(11), Ok(None));
for i in 12..=15 {
assert_eq!(q.force_push(i), Ok(Some(i - 5)));
}

assert!(q.close());
assert_eq!(q.force_push(40), Err(ForcePushError(40)));
for i in 11..=15 {
assert_eq!(q.pop(), Ok(i));
}
assert_eq!(q.pop(), Err(PopError::Closed));
}

#[cfg(not(target_family = "wasm"))]
#[test]
fn spsc() {
Expand Down
Loading