From 4ad7dd43c86b88d17f8ecdef1adcae2aab51ddb7 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 28 Nov 2023 08:41:20 +0000 Subject: [PATCH] switch to CounterPair struct, improve reliability of flaky test --- tokio/src/runtime/metrics/runtime.rs | 45 +++++++++---------- tokio/src/runtime/mod.rs | 17 +++++++ .../runtime/scheduler/current_thread/mod.rs | 12 +---- tokio/src/runtime/scheduler/mod.rs | 14 ++---- .../scheduler/multi_thread/handle/metrics.rs | 14 ++---- .../multi_thread_alt/handle/metrics.rs | 14 ++---- tokio/src/runtime/task/list.rs | 17 +++---- tokio/src/util/linked_list.rs | 18 ++++---- tokio/tests/rt_metrics.rs | 31 ++++++------- 9 files changed, 77 insertions(+), 105 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 32ed688e021..9cfc0b629c6 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -1,4 +1,4 @@ -use crate::runtime::Handle; +use crate::runtime::{CounterPair, Handle}; use std::ops::Range; use std::sync::atomic::Ordering::Relaxed; @@ -15,6 +15,18 @@ pub struct RuntimeMetrics { handle: Handle, } +impl CounterPair { + /// Determines the current length of the pair + pub fn len(&self) -> usize { + (self.inc - self.dec) as usize + } + + /// Determines if the counter pair represents an empty collection + pub fn is_empty(&self) -> bool { + self.inc == self.dec + } +} + impl RuntimeMetrics { pub(crate) fn new(handle: Handle) -> RuntimeMetrics { RuntimeMetrics { handle } @@ -85,10 +97,10 @@ impl RuntimeMetrics { /// } /// ``` pub fn active_tasks_count(&self) -> usize { - self.handle.inner.active_tasks_count() + self.handle.inner.task_counts().len() } - /// Returns the number of started tasks in the runtime. + /// Returns a counter pair representing the number of started and completed tasks in the runtime. /// /// # Examples /// @@ -100,30 +112,13 @@ impl RuntimeMetrics { /// let metrics = Handle::current().metrics(); /// /// let n = metrics.start_tasks_count(); - /// println!("Runtime has {} active tasks", n); - /// } - /// ``` - pub fn start_tasks_count(&self) -> u64 { - self.handle.inner.start_tasks_count() - } - - /// Returns the number of finished tasks in the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.stop_tasks_count(); - /// println!("Runtime has {} active tasks", n); + /// println!("Runtime has {} started tasks", n.inc); + /// println!("Runtime has {} completed tasks", n.dec); + /// println!("Runtime has {} active tasks", n.len()); /// } /// ``` - pub fn stop_tasks_count(&self) -> u64 { - self.handle.inner.stop_tasks_count() + pub fn task_counts(&self) -> CounterPair { + self.handle.inner.task_counts() } /// Returns the number of idle threads, which have spawned by the runtime diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index e3369cb2249..29a57c3647f 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -251,6 +251,7 @@ cfg_rt! { cfg_metrics! { mod metrics; pub use metrics::{RuntimeMetrics, HistogramScale}; + pub use crate::runtime::counter_pair::CounterPair; pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder}; @@ -267,3 +268,19 @@ cfg_rt! { /// After thread starts / before thread stops type Callback = std::sync::Arc; } + +pub(crate) mod counter_pair { + /// A gauge represented as two counters. + /// + /// Instead of decrementing a gauge, we increment a decrements counter. + /// This is beneficial as it allows you to observe activity spikes that occur + /// inbetween a scrape interval + #[derive(Copy, Clone, Debug, PartialEq, Eq)] + #[allow(unreachable_pub)] // rust-lang/rust#57411 + pub struct CounterPair { + /// Tracks how many times this gauge was incremented + pub inc: u64, + /// Tracks how many times this gauge was decremeneted + pub dec: u64, + } +} diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index a1ffccbbb82..14a714ef5a8 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -539,16 +539,8 @@ cfg_metrics! { self.blocking_spawner.queue_depth() } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } - - pub(crate) fn start_tasks_count(&self) -> u64 { - self.shared.owned.start_tasks_count() - } - - pub(crate) fn stop_tasks_count(&self) -> u64 { - self.shared.owned.stop_tasks_count() + pub(crate) fn tasks_count(&self) -> crate::runtime::counter_pair::CounterPair { + self.shared.owned.tasks_count() } } } diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 6df81aa88dd..4936a4ba96d 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -164,7 +164,7 @@ cfg_rt! { } cfg_metrics! { - use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + use crate::runtime::{SchedulerMetrics, WorkerMetrics, counter_pair::CounterPair}; impl Handle { pub(crate) fn num_workers(&self) -> usize { @@ -185,16 +185,8 @@ cfg_rt! { match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads()) } - pub(crate) fn active_tasks_count(&self) -> usize { - match_flavor!(self, Handle(handle) => handle.active_tasks_count()) - } - - pub(crate) fn start_tasks_count(&self) -> u64 { - match_flavor!(self, Handle(handle) => handle.start_tasks_count()) - } - - pub(crate) fn stop_tasks_count(&self) -> u64 { - match_flavor!(self, Handle(handle) => handle.stop_tasks_count()) + pub(crate) fn task_counts(&self) -> CounterPair { + match_flavor!(self, Handle(handle) => handle.tasks_count()) } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { diff --git a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs index d279689cca7..7afc933db41 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs @@ -1,6 +1,6 @@ use super::Handle; -use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +use crate::runtime::{counter_pair::CounterPair, SchedulerMetrics, WorkerMetrics}; impl Handle { pub(crate) fn num_workers(&self) -> usize { @@ -15,16 +15,8 @@ impl Handle { self.blocking_spawner.num_idle_threads() } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } - - pub(crate) fn start_tasks_count(&self) -> u64 { - self.shared.owned.start_tasks_count() - } - - pub(crate) fn stop_tasks_count(&self) -> u64 { - self.shared.owned.stop_tasks_count() + pub(crate) fn tasks_count(&self) -> CounterPair { + self.shared.owned.tasks_count() } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs index d279689cca7..7afc933db41 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs @@ -1,6 +1,6 @@ use super::Handle; -use crate::runtime::{SchedulerMetrics, WorkerMetrics}; +use crate::runtime::{counter_pair::CounterPair, SchedulerMetrics, WorkerMetrics}; impl Handle { pub(crate) fn num_workers(&self) -> usize { @@ -15,16 +15,8 @@ impl Handle { self.blocking_spawner.num_idle_threads() } - pub(crate) fn active_tasks_count(&self) -> usize { - self.shared.owned.active_tasks_count() - } - - pub(crate) fn start_tasks_count(&self) -> u64 { - self.shared.owned.start_tasks_count() - } - - pub(crate) fn stop_tasks_count(&self) -> u64 { - self.shared.owned.stop_tasks_count() + pub(crate) fn tasks_count(&self) -> CounterPair { + self.shared.owned.tasks_count() } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs index 387b4bbc142..eed30c9e69b 100644 --- a/tokio/src/runtime/task/list.rs +++ b/tokio/src/runtime/task/list.rs @@ -9,6 +9,7 @@ use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::loom::sync::Mutex; +use crate::runtime::counter_pair::CounterPair; use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task}; use crate::util::linked_list::{CountedLinkedList, Link, LinkedList}; @@ -166,16 +167,12 @@ impl OwnedTasks { } } - pub(crate) fn active_tasks_count(&self) -> usize { - self.inner.lock().list.count() - } - - pub(crate) fn start_tasks_count(&self) -> u64 { - self.inner.lock().list.added() - } - - pub(crate) fn stop_tasks_count(&self) -> u64 { - self.inner.lock().list.removed() + pub(crate) fn tasks_count(&self) -> CounterPair { + let lock = self.inner.lock(); + CounterPair { + inc: lock.list.added(), + dec: lock.list.removed(), + } } pub(crate) fn remove(&self, task: &Task) -> Option> { diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 7825c68afe5..bc76ea3bf1a 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -272,12 +272,6 @@ impl CountedLinkedList { val } - pub(crate) fn count(&self) -> usize { - // this subtraction can't underflow. - // this cast can't overflow because the length of the linked list can't exceed usize. - (self.added - self.removed) as usize - } - pub(crate) fn added(&self) -> u64 { self.added } @@ -811,21 +805,25 @@ pub(crate) mod tests { #[test] fn count() { let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new(); - assert_eq!(0, list.count()); + assert_eq!(0, list.added()); + assert_eq!(0, list.removed()); let a = entry(5); let b = entry(7); list.push_front(a.as_ref()); list.push_front(b.as_ref()); - assert_eq!(2, list.count()); + assert_eq!(2, list.added()); + assert_eq!(0, list.removed()); list.pop_back(); - assert_eq!(1, list.count()); + assert_eq!(2, list.added()); + assert_eq!(1, list.removed()); unsafe { list.remove(ptr(&b)); } - assert_eq!(0, list.count()); + assert_eq!(2, list.added()); + assert_eq!(2, list.removed()); } /// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module. diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index d5a2756a680..bce85f04640 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use tokio::macros::support::poll_fn; +use tokio::runtime::CounterPair; use tokio::runtime::Runtime; use tokio::task::consume_budget; use tokio::time::{self, Duration}; @@ -104,36 +105,32 @@ fn active_tasks_count() { fn active_tasks_count_pairs() { let rt = current_thread(); let metrics = rt.metrics(); - assert_eq!(0, metrics.start_tasks_count()); - assert_eq!(0, metrics.stop_tasks_count()); + assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts()); rt.block_on(rt.spawn(async move { - assert_eq!(1, metrics.start_tasks_count()); - assert_eq!(0, metrics.stop_tasks_count()); + assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts()); })) .unwrap(); - assert_eq!(1, rt.metrics().start_tasks_count()); - assert_eq!(1, rt.metrics().stop_tasks_count()); + assert_eq!(CounterPair { inc: 1, dec: 1 }, rt.metrics().task_counts()); let rt = threaded(); let metrics = rt.metrics(); - assert_eq!(0, metrics.start_tasks_count()); - assert_eq!(0, metrics.stop_tasks_count()); + assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts()); rt.block_on(rt.spawn(async move { - assert_eq!(1, metrics.start_tasks_count()); - assert_eq!(0, metrics.stop_tasks_count()); + assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts()); })) .unwrap(); - // for some reason, sometimes the stop count doesn't get a chance to incremenet before we get here. - // Only observed on single-cpu systems. Most likely the worker thread doesn't a chance to clean up - // the spawned task yet. We yield to give it an opportunity. - std::thread::yield_now(); - - assert_eq!(1, rt.metrics().start_tasks_count()); - assert_eq!(1, rt.metrics().stop_tasks_count()); + for _ in 0..100 { + if rt.metrics().task_counts() == (CounterPair { inc: 1, dec: 1 }) { + return; + } + // on single threaded machines (like in CI), we need to force the OS to run the runtime threads + std::thread::yield_now(); + } + panic!("runtime didn't decrement active task gauge") } #[test]