Skip to content

Commit

Permalink
Implement MpMcQueueView
Browse files Browse the repository at this point in the history
  • Loading branch information
sosthene-nitrokey committed Jul 1, 2024
1 parent 9abec11 commit 7ff1470
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Added `String::drain`.
- Implemented `DoubleEndedIterator` for `OldestOrdered`.
- Added std `Entry` methods to indexmap `Entry`.
- Added `MpMcQueueView`, the `!Sized` version of `MpMcQueue`.

### Changed

Expand Down
80 changes: 69 additions & 11 deletions src/mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ use portable_atomic as atomic;

use atomic::Ordering;

use crate::storage::{OwnedStorage, Storage, ViewStorage};

#[cfg(feature = "mpmc_large")]
type AtomicTargetSize = atomic::AtomicUsize;
#[cfg(not(feature = "mpmc_large"))]
Expand Down Expand Up @@ -128,17 +130,27 @@ pub type Q32<T> = MpMcQueue<T, 32>;
/// MPMC queue with a capability for 64 elements.
pub type Q64<T> = MpMcQueue<T, 64>;

/// MPMC queue with a capacity for N elements
/// N must be a power of 2
/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
pub struct MpMcQueue<T, const N: usize> {
buffer: UnsafeCell<[Cell<T>; N]>,
/// Base struct for [`MpMcQueue`] and [`MpMcQueueView`], generic over the [`Storage`].
///
/// In most cases you should use [`MpMcQueue`] or [`MpMcQueueView`] directly. Only use this
/// struct if you want to write code that's generic over both.
pub struct MpMcQueueInner<T, S: Storage> {
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
buffer: UnsafeCell<S::Buffer<Cell<T>>>,
}

/// MPMC queue with a capacity for N elements
/// N must be a power of 2
/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
pub type MpMcQueue<T, const N: usize> = MpMcQueueInner<T, OwnedStorage<N>>;

/// MPMC queue with a capacity for N elements
/// N must be a power of 2
/// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled.
pub type MpMcQueueView<T> = MpMcQueueInner<T, ViewStorage>;

impl<T, const N: usize> MpMcQueue<T, N> {
const MASK: UintSize = (N - 1) as UintSize;
const EMPTY_CELL: Cell<T> = Cell::new(0);

const ASSERT: [(); 1] = [()];
Expand Down Expand Up @@ -167,10 +179,56 @@ impl<T, const N: usize> MpMcQueue<T, N> {
enqueue_pos: AtomicTargetSize::new(0),
}
}
/// Get a reference to the `MpMcQueue`, erasing the `N` const-generic.
///
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView};
/// let queue: MpMcQueue<u8, 2> = MpMcQueue::new();
/// let view: &MpMcQueueView<u8> = queue.as_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `MpMcQueue<T, N>` implements `Unsize<MpMcQueueView<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView};
/// let queue: MpMcQueue<u8, 2> = MpMcQueue::new();
/// let view: &MpMcQueueView<u8> = &queue;
/// ```
#[inline]
pub const fn as_view(&self) -> &MpMcQueueView<T> {
self
}

/// Get a mutable reference to the `MpMcQueue`, erasing the `N` const-generic.
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView};
/// let mut queue: MpMcQueue<u8, 2> = MpMcQueue::new();
/// let view: &mut MpMcQueueView<u8> = queue.as_mut_view();
/// ```
///
/// It is often preferable to do the same through type coerction, since `MpMcQueue<T, N>` implements `Unsize<MpMcQueueView<T>>`:
///
/// ```rust
/// # use heapless::mpmc::{MpMcQueue, MpMcQueueView};
/// let mut queue: MpMcQueue<u8, 2> = MpMcQueue::new();
/// let view: &mut MpMcQueueView<u8> = &mut queue;
/// ```
#[inline]
pub fn as_mut_view(&mut self) -> &mut MpMcQueueView<T> {
self
}
}

impl<T, S: Storage> MpMcQueueInner<T, S> {
fn mask(&self) -> UintSize {
(S::len(self.buffer.get()) - 1) as _
}

/// Returns the item in the front of the queue, or `None` if the queue is empty
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
}

/// Adds an `item` to the end of the queue
Expand All @@ -179,9 +237,9 @@ impl<T, const N: usize> MpMcQueue<T, N> {
pub fn enqueue(&self, item: T) -> Result<(), T> {
unsafe {
enqueue(
self.buffer.get() as *mut _,
S::as_ptr(self.buffer.get()),
&self.enqueue_pos,
Self::MASK,
self.mask(),
item,
)
}
Expand All @@ -194,14 +252,14 @@ impl<T, const N: usize> Default for MpMcQueue<T, N> {
}
}

impl<T, const N: usize> Drop for MpMcQueue<T, N> {
impl<T, S: Storage> Drop for MpMcQueueInner<T, S> {
fn drop(&mut self) {
// drop all contents currently in the queue
while self.dequeue().is_some() {}
}
}

unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
unsafe impl<T, S: Storage> Sync for MpMcQueueInner<T, S> where T: Send {}

struct Cell<T> {
data: MaybeUninit<T>,
Expand Down
23 changes: 23 additions & 0 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use core::borrow::{Borrow, BorrowMut};

pub(crate) trait SealedStorage {
type Buffer<T>: ?Sized + Borrow<[T]> + BorrowMut<[T]>;
/// Obtain the length of the buffer
fn len<T>(this: *const Self::Buffer<T>) -> usize;
/// Obtain access to the first element of the buffer
fn as_ptr<T>(this: *mut Self::Buffer<T>) -> *mut T;
}

/// Trait defining how data for a container is stored.
Expand Down Expand Up @@ -33,11 +37,30 @@ pub enum OwnedStorage<const N: usize> {}
impl<const N: usize> Storage for OwnedStorage<N> {}
impl<const N: usize> SealedStorage for OwnedStorage<N> {
type Buffer<T> = [T; N];
fn len<T>(_: *const Self::Buffer<T>) -> usize {
N
}
fn as_ptr<T>(this: *mut Self::Buffer<T>) -> *mut T {
this as _
}
}

/// Implementation of [`Storage`] that stores the data in an unsized `[T]`.
pub enum ViewStorage {}
impl Storage for ViewStorage {}
impl SealedStorage for ViewStorage {
type Buffer<T> = [T];
fn len<T>(this: *const Self::Buffer<T>) -> usize {
// We get the len of the buffer. There is no reverse method of `core::ptr::from_raw_parts`, so we have to work around it.
let ptr: *const [()] = this as _;
// SAFETY: There is no aliasing as () is zero-sized
let slice: &[()] = unsafe { &*ptr };
let len = slice.len();

len - 1
}

fn as_ptr<T>(this: *mut Self::Buffer<T>) -> *mut T {
this as _
}
}

0 comments on commit 7ff1470

Please sign in to comment.