Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 24 additions & 30 deletions tokio/src/runtime/scheduler/multi_thread/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,19 @@ pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
impl<T> Local<T> {
/// Returns the number of entries in the queue
pub(crate) fn len(&self) -> usize {
self.inner.len() as usize
let (_, head) = unpack(self.inner.head.load(Acquire));
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };
len(head, tail)
}

/// How many tasks can be pushed into the queue
pub(crate) fn remaining_slots(&self) -> usize {
self.inner.remaining_slots()
let (steal, _) = unpack(self.inner.head.load(Acquire));
// safety: this is the **only** thread that updates this cell.
let tail = unsafe { self.inner.tail.unsync_load() };

LOCAL_QUEUE_CAPACITY - len(steal, tail)
}

pub(crate) fn max_capacity(&self) -> usize {
Expand All @@ -124,7 +131,7 @@ impl<T> Local<T> {
/// Separate to `is_stealable` so that refactors of `is_stealable` to "protect"
/// some tasks from stealing won't affect this
pub(crate) fn has_tasks(&self) -> bool {
!self.inner.is_empty()
self.len() != 0
}

/// Pushes a batch of tasks to the back of the queue. All tasks must fit in
Expand Down Expand Up @@ -384,8 +391,17 @@ impl<T> Local<T> {
}

impl<T> Steal<T> {
/// Returns the number of entries in the queue
pub(crate) fn len(&self) -> usize {
let (_, head) = unpack(self.0.head.load(Acquire));
let tail = self.0.tail.load(Acquire);
len(head, tail)
}

/// Return true if the queue is empty,
/// false if there are any entries in the queue
pub(crate) fn is_empty(&self) -> bool {
self.0.is_empty()
self.len() == 0
}

/// Steals half the tasks from self and place them into `dst`.
Expand Down Expand Up @@ -543,14 +559,6 @@ impl<T> Steal<T> {
}
}

cfg_unstable_metrics! {
impl<T> Steal<T> {
pub(crate) fn len(&self) -> usize {
self.0.len() as _
}
}
}

impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
Expand All @@ -565,24 +573,10 @@ impl<T> Drop for Local<T> {
}
}

impl<T> Inner<T> {
fn remaining_slots(&self) -> usize {
let (steal, _) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);

LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
}

fn len(&self) -> UnsignedShort {
let (_, head) = unpack(self.head.load(Acquire));
let tail = self.tail.load(Acquire);

tail.wrapping_sub(head)
}

fn is_empty(&self) -> bool {
self.len() == 0
}
/// Calculate the length of the queue using the head and tail.
/// The `head` can be the `steal` or `real` head.
fn len(head: UnsignedShort, tail: UnsignedShort) -> usize {
tail.wrapping_sub(head) as usize
}

/// Split the head value into the real head and the index a stealer is working
Expand Down
Loading