diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 8d30f66f6ff..f3e02a2a602 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -75,7 +75,15 @@ impl RuntimeMetrics { self.handle.inner.num_blocking_threads() } - /// Returns the number of active tasks in the runtime. + #[deprecated = "Renamed to num_active_tasks"] + /// Renamed to [`RuntimeMetrics::num_active_tasks`] + pub fn active_tasks_count(&self) -> usize { + self.num_active_tasks() + } + + /// Returns the current number of active tasks in the runtime. + /// + /// This value increases and decreases over time as tasks are spawned and as they are completed or cancelled. /// /// # Examples /// @@ -86,14 +94,38 @@ impl RuntimeMetrics { /// async fn main() { /// let metrics = Handle::current().metrics(); /// - /// let n = metrics.active_tasks_count(); + /// let n = metrics.num_active_tasks(); /// println!("Runtime has {} active tasks", n); /// } /// ``` - pub fn active_tasks_count(&self) -> usize { + pub fn num_active_tasks(&self) -> usize { self.handle.inner.active_tasks_count() } + /// Returns the number of tasks spawned in this runtime since it was created. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task is spawned. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.spawned_tasks_count(); + /// println!("Runtime has had {} tasks spawned", n); + /// } + /// ``` + pub fn spawned_tasks_count(&self) -> u64 { + self.handle.inner.spawned_tasks_count() + } + /// Returns the number of idle threads, which have spawned by the runtime /// for `spawn_blocking` calls. /// diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index b9c23837a58..805b92a818d 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -536,6 +536,10 @@ cfg_unstable_metrics! { pub(crate) fn active_tasks_count(&self) -> usize { self.shared.owned.active_tasks_count() } + + pub(crate) fn spawned_tasks_count(&self) -> u64 { + self.shared.owned.spawned_tasks_count() + } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 3cbba11b752..97d9ca6ba17 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -191,6 +191,10 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.active_tasks_count()) } + pub(crate) fn spawned_tasks_count(&self) -> u64 { + match_flavor!(self, Handle(handle) => handle.spawned_tasks_count()) + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index 6ced245ee5b..b1c5bb29f96 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -25,6 +25,10 @@ impl Handle { self.shared.owned.active_tasks_count() } + pub(crate) fn spawned_tasks_count(&self) -> u64 { + self.shared.owned.spawned_tasks_count() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs index 3d614b478c5..38d9f4b1cc6 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs @@ -22,6 +22,10 @@ impl Handle { self.shared.owned.active_tasks_count() } + pub(crate) fn spawned_tasks_count(&self) -> u64 { + self.shared.owned.spawned_tasks_count() + } + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { &self.shared.scheduler_metrics } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 41a5fb439c9..5f91cc83da9 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -170,6 +170,10 @@ impl OwnedTasks { self.list.len() } + pub(crate) fn spawned_tasks_count(&self) -> u64 { + self.list.added() + } + pub(crate) fn remove(&self, task: &Task) -> Option> { // If the task's owner ID is `None` then it is not part of any list and // doesn't need removing. diff --git a/tokio/src/util/sharded_list.rs b/tokio/src/util/sharded_list.rs index 4da0bcdf7f1..46c34f14133 100644 --- a/tokio/src/util/sharded_list.rs +++ b/tokio/src/util/sharded_list.rs @@ -1,6 +1,7 @@ use std::ptr::NonNull; use std::sync::atomic::Ordering; +use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::{Mutex, MutexGuard}; use std::sync::atomic::AtomicUsize; @@ -14,6 +15,7 @@ use super::linked_list::{Link, LinkedList}; /// Note: Due to its inner sharded design, the order of nodes cannot be guaranteed. pub(crate) struct ShardedList { lists: Box<[Mutex>]>, + added: AtomicU64, count: AtomicUsize, shard_mask: usize, } @@ -42,6 +44,7 @@ impl ShardedList { } Self { lists: lists.into_boxed_slice(), + added: AtomicU64::new(0), count: AtomicUsize::new(0), shard_mask, } @@ -51,6 +54,7 @@ impl ShardedList { /// Used to get the lock of shard. pub(crate) struct ShardGuard<'a, L, T> { lock: MutexGuard<'a, LinkedList>, + added: &'a AtomicU64, count: &'a AtomicUsize, id: usize, } @@ -92,6 +96,7 @@ impl ShardedList { let id = unsafe { L::get_shard_id(L::as_raw(val)) }; ShardGuard { lock: self.shard_inner(id), + added: &self.added, count: &self.count, id, } @@ -102,6 +107,11 @@ impl ShardedList { self.count.load(Ordering::Relaxed) } + /// Gets the total number of elements added to this list. + pub(crate) fn added(&self) -> u64 { + self.added.load(Ordering::Relaxed) + } + /// Returns whether the linked list does not contain any node. pub(crate) fn is_empty(&self) -> bool { self.len() == 0 @@ -127,6 +137,7 @@ impl<'a, L: ShardedListItem> ShardGuard<'a, L, L::Target> { let id = unsafe { L::get_shard_id(L::as_raw(&val)) }; assert_eq!(id, self.id); self.lock.push_front(val); + self.added.fetch_add(1, Ordering::Relaxed); self.count.fetch_add(1, Ordering::Relaxed); } } diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 2446deb6b41..0680d7960a5 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -93,20 +93,60 @@ fn blocking_queue_depth() { } #[test] -fn active_tasks_count() { +fn num_active_tasks() { let rt = current_thread(); let metrics = rt.metrics(); - assert_eq!(0, metrics.active_tasks_count()); - rt.spawn(async move { - assert_eq!(1, metrics.active_tasks_count()); - }); + assert_eq!(0, metrics.num_active_tasks()); + rt.block_on(rt.spawn(async move { + assert_eq!(1, metrics.num_active_tasks()); + })) + .unwrap(); + + assert_eq!(0, rt.metrics().num_active_tasks()); let rt = threaded(); let metrics = rt.metrics(); - assert_eq!(0, metrics.active_tasks_count()); - rt.spawn(async move { - assert_eq!(1, metrics.active_tasks_count()); - }); + assert_eq!(0, metrics.num_active_tasks()); + rt.block_on(rt.spawn(async move { + assert_eq!(1, metrics.num_active_tasks()); + })) + .unwrap(); + + // try for 10 seconds to see if this eventually succeeds. + // wake_join() is called before the task is released, so in multithreaded + // code, this means we sometimes exit the block_on before the counter decrements. + for _ in 0..100 { + if rt.metrics().num_active_tasks() == 0 { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert_eq!(0, rt.metrics().num_active_tasks()); +} + +#[test] +fn spawned_tasks_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + assert_eq!(0, metrics.spawned_tasks_count()); + + rt.block_on(rt.spawn(async move { + assert_eq!(1, metrics.spawned_tasks_count()); + })) + .unwrap(); + + assert_eq!(1, rt.metrics().spawned_tasks_count()); + + let rt = threaded(); + let metrics = rt.metrics(); + assert_eq!(0, metrics.spawned_tasks_count()); + + rt.block_on(rt.spawn(async move { + assert_eq!(1, metrics.spawned_tasks_count()); + })) + .unwrap(); + + assert_eq!(1, rt.metrics().spawned_tasks_count()); } #[test]