-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an overflow push method #58
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; | |
use crate::sync::cell::UnsafeCell; | ||
#[allow(unused_imports)] | ||
use crate::sync::prelude::*; | ||
use crate::{busy_wait, PopError, PushError}; | ||
use crate::{busy_wait, ForcePushError, PopError, PushError}; | ||
|
||
/// A slot in a queue. | ||
struct Slot<T> { | ||
|
@@ -83,6 +83,74 @@ impl<T> Bounded<T> { | |
|
||
/// Attempts to push an item into the queue. | ||
pub fn push(&self, value: T) -> Result<(), PushError<T>> { | ||
self.push_or_else(value, |value, tail, _, _| { | ||
let head = self.head.load(Ordering::Relaxed); | ||
|
||
// If the head lags one lap behind the tail as well... | ||
if head.wrapping_add(self.one_lap) == tail { | ||
// ...then the queue is full. | ||
Err(PushError::Full(value)) | ||
} else { | ||
Ok(value) | ||
} | ||
}) | ||
} | ||
|
||
/// Pushes an item into the queue, displacing another item if needed. | ||
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> { | ||
let result = self.push_or_else(value, |value, tail, new_tail, slot| { | ||
let head = tail.wrapping_sub(self.one_lap); | ||
let new_head = new_tail.wrapping_sub(self.one_lap); | ||
|
||
// Try to move the head. | ||
if self | ||
.head | ||
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed) | ||
.is_ok() | ||
{ | ||
// Move the tail. | ||
self.tail.store(new_tail, Ordering::SeqCst); | ||
|
||
// Swap out the old value. | ||
// SAFETY: We know this is initialized, since it's covered by the current queue. | ||
let old = unsafe { | ||
slot.value | ||
.with_mut(|slot| slot.replace(MaybeUninit::new(value)).assume_init()) | ||
}; | ||
|
||
// Update the stamp. | ||
slot.stamp.store(tail + 1, Ordering::Release); | ||
|
||
// Return a PushError. | ||
Err(PushError::Full(old)) | ||
} else { | ||
Ok(value) | ||
} | ||
}); | ||
|
||
match result { | ||
Ok(()) => Ok(None), | ||
Err(PushError::Full(old_value)) => Ok(Some(old_value)), | ||
Err(PushError::Closed(value)) => Err(ForcePushError(value)), | ||
} | ||
} | ||
|
||
/// Attempts to push an item into the queue, running a closure on failure. | ||
/// | ||
/// `fail` is run when there is no more room left in the tail of the queue. The parameters of | ||
/// this function are as follows: | ||
/// | ||
/// - The item that failed to push. | ||
/// - The value of `self.tail` before the new value would be inserted. | ||
/// - The value of `self.tail` after the new value would be inserted. | ||
/// - The slot that we attempted to push into. | ||
/// | ||
/// If `fail` returns `Ok(val)`, we will try pushing `val` to the head of the queue. Otherwise, | ||
/// this function will return the error. | ||
fn push_or_else<F>(&self, mut value: T, mut fail: F) -> Result<(), PushError<T>> | ||
notgull marked this conversation as resolved.
Show resolved
Hide resolved
|
||
where | ||
F: FnMut(T, usize, usize, &Slot<T>) -> Result<T, PushError<T>>, | ||
{ | ||
let mut tail = self.tail.load(Ordering::Relaxed); | ||
|
||
loop { | ||
|
@@ -95,22 +163,23 @@ impl<T> Bounded<T> { | |
let index = tail & (self.mark_bit - 1); | ||
let lap = tail & !(self.one_lap - 1); | ||
|
||
// Calculate the new location of the tail. | ||
let new_tail = if index + 1 < self.buffer.len() { | ||
// Same lap, incremented index. | ||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`. | ||
tail + 1 | ||
} else { | ||
// One lap forward, index wraps around to zero. | ||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. | ||
lap.wrapping_add(self.one_lap) | ||
}; | ||
|
||
// Inspect the corresponding slot. | ||
let slot = &self.buffer[index]; | ||
let stamp = slot.stamp.load(Ordering::Acquire); | ||
|
||
// If the tail and the stamp match, we may attempt to push. | ||
if tail == stamp { | ||
let new_tail = if index + 1 < self.buffer.len() { | ||
// Same lap, incremented index. | ||
// Set to `{ lap: lap, mark: 0, index: index + 1 }`. | ||
tail + 1 | ||
} else { | ||
// One lap forward, index wraps around to zero. | ||
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`. | ||
lap.wrapping_add(self.one_lap) | ||
}; | ||
|
||
// Try moving the tail. | ||
match self.tail.compare_exchange_weak( | ||
tail, | ||
|
@@ -132,13 +201,9 @@ impl<T> Bounded<T> { | |
} | ||
} else if stamp.wrapping_add(self.one_lap) == tail + 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is done in order to tell if the stamp has advanced to where the tail would logically be, as opposed to where the tail actually is. I'm not 100% clear on the logic here; this was written long before I joined smol-rs, and I think it was originally part of crossbeam. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, it mght be a good idea to maybe investigate and properly document this sometime (not even necessarily inside of the code, a separate tex document with diagrams of this might be a good idea). Maybe I find time to do that (but unlikely in the next few months because I need to time to write a Bachelor thesis). |
||
crate::full_fence(); | ||
let head = self.head.load(Ordering::Relaxed); | ||
|
||
// If the head lags one lap behind the tail as well... | ||
if head.wrapping_add(self.one_lap) == tail { | ||
// ...then the queue is full. | ||
return Err(PushError::Full(value)); | ||
} | ||
// We've failed to push; run our failure closure. | ||
value = fail(value, tail, new_tail, slot)?; | ||
|
||
// Loom complains if there isn't an explicit busy wait here. | ||
#[cfg(loom)] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
use core::mem::MaybeUninit; | ||
use core::ptr; | ||
|
||
use crate::sync::atomic::{AtomicUsize, Ordering}; | ||
use crate::sync::cell::UnsafeCell; | ||
#[allow(unused_imports)] | ||
use crate::sync::prelude::*; | ||
use crate::{busy_wait, PopError, PushError}; | ||
use crate::{busy_wait, ForcePushError, PopError, PushError}; | ||
|
||
const LOCKED: usize = 1 << 0; | ||
const PUSHED: usize = 1 << 1; | ||
|
@@ -47,6 +48,55 @@ impl<T> Single<T> { | |
} | ||
} | ||
|
||
/// Attempts to push an item into the queue, displacing another if necessary. | ||
pub fn force_push(&self, value: T) -> Result<Option<T>, ForcePushError<T>> { | ||
// Attempt to lock the slot. | ||
let mut state = 0; | ||
|
||
loop { | ||
// Lock the slot. | ||
let prev = self | ||
.state | ||
.compare_exchange(state, LOCKED | PUSHED, Ordering::SeqCst, Ordering::SeqCst) | ||
.unwrap_or_else(|x| x); | ||
|
||
if prev & CLOSED != 0 { | ||
return Err(ForcePushError(value)); | ||
} | ||
|
||
if prev == state { | ||
// Swap out the value. | ||
// SAFETY: We have locked the state. | ||
let prev_value = unsafe { | ||
self.slot | ||
.with_mut(move |slot| ptr::replace(slot, MaybeUninit::new(value))) | ||
}; | ||
|
||
// We can unlock the slot now. | ||
self.state.fetch_and(!LOCKED, Ordering::Release); | ||
|
||
// If the value was pushed, initialize it and return it. | ||
let prev_value = if prev & PUSHED == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think by processing this before the fetch_and, the cost of moving the previous value when PUSHED == 0 can be avoided. (The compiler may be able to handle it well in release mode, but at least it would be useful in debug mode when the element is costly to move, such as large arrays.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in #71 |
||
None | ||
} else { | ||
Some(unsafe { prev_value.assume_init() }) | ||
}; | ||
|
||
// Return the old value. | ||
return Ok(prev_value); | ||
} | ||
|
||
// Try to go for the current (pushed) state. | ||
if prev & LOCKED == 0 { | ||
state = prev; | ||
} else { | ||
// State is locked. | ||
busy_wait(); | ||
state = prev & !LOCKED; | ||
} | ||
} | ||
} | ||
|
||
/// Attempts to pop an item from the queue. | ||
pub fn pop(&self) -> Result<T, PopError> { | ||
let mut state = PUSHED; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
#![allow(clippy::bool_assert_comparison)] | ||
|
||
use concurrent_queue::{ConcurrentQueue, PopError, PushError}; | ||
use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError}; | ||
|
||
#[cfg(not(target_family = "wasm"))] | ||
use easy_parallel::Parallel; | ||
|
@@ -74,6 +74,32 @@ fn close() { | |
assert_eq!(q.pop(), Err(PopError::Closed)); | ||
} | ||
|
||
#[test] | ||
fn force_push() { | ||
let q = ConcurrentQueue::<i32>::bounded(5); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a test for bounded queue, but the test file is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in #68 |
||
|
||
for i in 1..=5 { | ||
assert_eq!(q.force_push(i), Ok(None)); | ||
} | ||
|
||
assert!(!q.is_closed()); | ||
for i in 6..=10 { | ||
assert_eq!(q.force_push(i), Ok(Some(i - 5))); | ||
} | ||
assert_eq!(q.pop(), Ok(6)); | ||
assert_eq!(q.force_push(11), Ok(None)); | ||
for i in 12..=15 { | ||
assert_eq!(q.force_push(i), Ok(Some(i - 5))); | ||
} | ||
|
||
assert!(q.close()); | ||
assert_eq!(q.force_push(40), Err(ForcePushError(40))); | ||
for i in 11..=15 { | ||
assert_eq!(q.pop(), Ok(i)); | ||
} | ||
assert_eq!(q.pop(), Err(PopError::Closed)); | ||
} | ||
|
||
#[cfg(not(target_family = "wasm"))] | ||
#[test] | ||
fn spsc() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll ask for a small clarification: why is the
slot
the correct one to put the new value inside? (in contrast to a slot derived from thenew_tail
)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slot
is the oldtail
, which means it's the last unfilled slot.