Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add task counter pairs #6114

Merged
merged 1 commit into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ impl RuntimeMetrics {
self.handle.inner.num_blocking_threads()
}

/// Returns the number of active tasks in the runtime.
#[deprecated = "Renamed to num_active_tasks"]
/// Renamed to [`RuntimeMetrics::num_active_tasks`]
pub fn active_tasks_count(&self) -> usize {
self.num_active_tasks()
}

/// Returns the current number of active tasks in the runtime.
///
/// This value increases and decreases over time as tasks are spawned and as they are completed or cancelled.
///
/// # Examples
///
Expand All @@ -86,14 +94,38 @@ impl RuntimeMetrics {
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.active_tasks_count();
/// let n = metrics.num_active_tasks();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
pub fn num_active_tasks(&self) -> usize {
self.handle.inner.active_tasks_count()
}

/// Returns the number of tasks spawned in this runtime since it was created.
///
/// This count starts at zero when the runtime is created and increases by one each time a task is spawned.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.spawned_tasks_count();
/// println!("Runtime has had {} tasks spawned", n);
/// }
/// ```
pub fn spawned_tasks_count(&self) -> u64 {
self.handle.inner.spawned_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,10 @@ cfg_unstable_metrics! {
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.spawned_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.shared.owned.spawned_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ impl<S: 'static> OwnedTasks<S> {
self.list.len()
}

pub(crate) fn spawned_tasks_count(&self) -> u64 {
self.list.added()
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
// If the task's owner ID is `None` then it is not part of any list and
// doesn't need removing.
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/util/sharded_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::ptr::NonNull;
use std::sync::atomic::Ordering;

use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::{Mutex, MutexGuard};
use std::sync::atomic::AtomicUsize;

Expand All @@ -14,6 +15,7 @@ use super::linked_list::{Link, LinkedList};
/// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed.
pub(crate) struct ShardedList<L, T> {
lists: Box<[Mutex<LinkedList<L, T>>]>,
added: AtomicU64,
count: AtomicUsize,
shard_mask: usize,
}
Expand Down Expand Up @@ -42,6 +44,7 @@ impl<L, T> ShardedList<L, T> {
}
Self {
lists: lists.into_boxed_slice(),
added: AtomicU64::new(0),
count: AtomicUsize::new(0),
shard_mask,
}
Expand All @@ -51,6 +54,7 @@ impl<L, T> ShardedList<L, T> {
/// Used to get the lock of shard.
pub(crate) struct ShardGuard<'a, L, T> {
lock: MutexGuard<'a, LinkedList<L, T>>,
added: &'a AtomicU64,
count: &'a AtomicUsize,
id: usize,
}
Expand Down Expand Up @@ -92,6 +96,7 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(val)) };
ShardGuard {
lock: self.shard_inner(id),
added: &self.added,
count: &self.count,
id,
}
Expand All @@ -102,6 +107,11 @@ impl<L: ShardedListItem> ShardedList<L, L::Target> {
self.count.load(Ordering::Relaxed)
}

/// Gets the total number of elements added to this list.
pub(crate) fn added(&self) -> u64 {
self.added.load(Ordering::Relaxed)
}

/// Returns whether the linked list does not contain any node.
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
Expand All @@ -127,6 +137,7 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> {
let id = unsafe { L::get_shard_id(L::as_raw(&val)) };
assert_eq!(id, self.id);
self.lock.push_front(val);
self.added.fetch_add(1, Ordering::Relaxed);
self.count.fetch_add(1, Ordering::Relaxed);
}
}
Expand Down
58 changes: 49 additions & 9 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,60 @@ fn blocking_queue_depth() {
}

#[test]
fn active_tasks_count() {
fn num_active_tasks() {
conradludgate marked this conversation as resolved.
Show resolved Hide resolved
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

assert_eq!(0, rt.metrics().num_active_tasks());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.active_tasks_count());
rt.spawn(async move {
assert_eq!(1, metrics.active_tasks_count());
});
assert_eq!(0, metrics.num_active_tasks());
rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.num_active_tasks());
}))
.unwrap();

// try for 10 seconds to see if this eventually succeeds.
// wake_join() is called before the task is released, so in multithreaded
// code, this means we sometimes exit the block_on before the counter decrements.
for _ in 0..100 {
if rt.metrics().num_active_tasks() == 0 {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert_eq!(0, rt.metrics().num_active_tasks());
}

#[test]
fn spawned_tasks_count() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.spawned_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.spawned_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().spawned_tasks_count());
}

#[test]
Expand Down
Loading