Skip to content

Commit

Permalink
feat: add task counter pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Jun 9, 2024
1 parent 53b586c commit bb82406
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 12 deletions.
38 changes: 35 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
#[deprecated = "Renamed to num_active_tasks"]
/// Renamed to [`RuntimeMetrics::num_active_tasks`]
pub fn active_tasks_count(&self) -> usize {
self.num_active_tasks()
}

/// Returns the current number of active tasks in the runtime.
///
/// This value increases and decreases over time as tasks are spawned and as they are completed or cancelled.
///
/// # Examples
///
Expand All @@ -86,14 +94,38 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// let n = metrics.num_active_tasks();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
pub fn num_active_tasks(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of tasks spawned in this runtime since it was created.
///
/// This count starts at zero when the runtime is created and increases by one each time a task is spawned.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.spawned_tasks_count();
/// println!("Runtime has had {} tasks spawned", n);
/// }
/// ```
pub fn spawned_tasks_count(&self) -> u64 {
self.handle.inner.spawned_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ cfg_unstable_metrics! {
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ impl<S: 'static> OwnedTasks<S> {
self.list.len()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
// If the task's owner ID is `None` then it is not part of any list and
// doesn't need removing.
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/util/sharded_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::{Mutex, MutexGuard};
use std::sync::atomic::AtomicUsize;

Expand All @@ -14,6 +15,7 @@ use super::linked_list::{Link, LinkedList};
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: AtomicU64,
count: AtomicUsize,
shard_mask: usize,
}
Expand Down Expand Up @@ -42,6 +44,7 @@ impl<L, T> ShardedList<L, T> {
}
Self {
lists: lists.into_boxed_slice(),
added: AtomicU64::new(0),
count: AtomicUsize::new(0),
shard_mask,
}
Expand All @@ -51,6 +54,7 @@ impl<L, T> ShardedList<L, T> {
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a AtomicU64,
count: &'a AtomicUsize,
id: usize,
}
Expand Down Expand Up @@ -92,6 +96,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
ShardGuard {
lock: self.shard_inner(id),
added: &self.added,
count: &self.count,
id,
}
Expand All @@ -102,6 +107,11 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
self.count.load(Ordering::Relaxed)
}

/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}

/// Returns whether the linked list does not contain any node.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
Expand All @@ -127,6 +137,7 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
}
Expand Down
58 changes: 49 additions & 9 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,60 @@ fn blocking_queue_depth() {
}

#[test]
fn active_tasks_count() {
fn num_active_tasks() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

assert_eq!(0, rt.metrics().num_active_tasks());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

// try for 10 seconds to see if this eventually succeeds.
// wake_join() is called before the task is released, so in multithreaded
// code, this means we sometimes exit the block_on before the counter decrements.
for _ in 0..100 {
if rt.metrics().num_active_tasks() == 0 {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert_eq!(0, rt.metrics().num_active_tasks());
}

#[test]
fn spawned_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());
}

#[test]
Expand Down

0 comments on commit bb82406

Please sign in to comment.