Skip to content

Commit

Permalink
switch to CounterPair struct, improve reliability of flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Nov 28, 2023
1 parent a4d95ac commit 4ad7dd4
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 105 deletions.
45 changes: 20 additions & 25 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::Handle;
use crate::runtime::{CounterPair, Handle};

use std::ops::Range;
use std::sync::atomic::Ordering::Relaxed;
Expand All @@ -15,6 +15,18 @@ pub struct RuntimeMetrics {
handle: Handle,
}

impl CounterPair {
/// Determines the current length of the pair
pub fn len(&self) -> usize {
(self.inc - self.dec) as usize
}

/// Determines if the counter pair represents an empty collection
pub fn is_empty(&self) -> bool {
self.inc == self.dec
}
}

impl RuntimeMetrics {
pub(crate) fn new(handle: Handle) -> RuntimeMetrics {
RuntimeMetrics { handle }
Expand Down Expand Up @@ -85,10 +97,10 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn active_tasks_count(&self) -> usize {
self.handle.inner.active_tasks_count()
self.handle.inner.task_counts().len()
}

/// Returns the number of started tasks in the runtime.
/// Returns a counter pair representing the number of started and completed tasks in the runtime.
///
/// # Examples
///
Expand All @@ -100,30 +112,13 @@ impl RuntimeMetrics {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.start_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn start_tasks_count(&self) -> u64 {
self.handle.inner.start_tasks_count()
}

/// Returns the number of finished tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.stop_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// println!("Runtime has {} started tasks", n.inc);
/// println!("Runtime has {} completed tasks", n.dec);
/// println!("Runtime has {} active tasks", n.len());
/// }
/// ```
pub fn stop_tasks_count(&self) -> u64 {
self.handle.inner.stop_tasks_count()
pub fn task_counts(&self) -> CounterPair {
self.handle.inner.task_counts()
}

/// Returns the number of idle threads, which have spawned by the runtime
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ cfg_rt! {
cfg_metrics! {
mod metrics;
pub use metrics::{RuntimeMetrics, HistogramScale};
pub use crate::runtime::counter_pair::CounterPair;

pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics, HistogramBuilder};

Expand All @@ -267,3 +268,19 @@ cfg_rt! {
/// After thread starts / before thread stops
type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
}

pub(crate) mod counter_pair {
/// A gauge represented as two counters.
///
/// Instead of decrementing a gauge, we increment a decrements counter.
/// This is beneficial as it allows you to observe activity spikes that occur
/// inbetween a scrape interval
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[allow(unreachable_pub)] // rust-lang/rust#57411
pub struct CounterPair {
/// Tracks how many times this gauge was incremented
pub inc: u64,
/// Tracks how many times this gauge was decremeneted
pub dec: u64,
}
}
12 changes: 2 additions & 10 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,16 +539,8 @@ cfg_metrics! {
self.blocking_spawner.queue_depth()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
pub(crate) fn tasks_count(&self) -> crate::runtime::counter_pair::CounterPair {
self.shared.owned.tasks_count()
}
}
}
Expand Down
14 changes: 3 additions & 11 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ cfg_rt! {
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::{SchedulerMetrics, WorkerMetrics, counter_pair::CounterPair};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
Expand All @@ -185,16 +185,8 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
}

pub(crate) fn active_tasks_count(&self) -> usize {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn start_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.start_tasks_count())
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.stop_tasks_count())
pub(crate) fn task_counts(&self) -> CounterPair {
match_flavor!(self, Handle(handle) => handle.tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
Expand Down
14 changes: 3 additions & 11 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Handle;

use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::{counter_pair::CounterPair, SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
Expand All @@ -15,16 +15,8 @@ impl Handle {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
pub(crate) fn tasks_count(&self) -> CounterPair {
self.shared.owned.tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
Expand Down
14 changes: 3 additions & 11 deletions tokio/src/runtime/scheduler/multi_thread_alt/handle/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Handle;

use crate::runtime::{SchedulerMetrics, WorkerMetrics};
use crate::runtime::{counter_pair::CounterPair, SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
Expand All @@ -15,16 +15,8 @@ impl Handle {
self.blocking_spawner.num_idle_threads()
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
pub(crate) fn tasks_count(&self) -> CounterPair {
self.shared.owned.tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
Expand Down
17 changes: 7 additions & 10 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::Mutex;
use crate::runtime::counter_pair::CounterPair;
use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};

Expand Down Expand Up @@ -166,16 +167,12 @@ impl<S: 'static> OwnedTasks<S> {
}
}

pub(crate) fn active_tasks_count(&self) -> usize {
self.inner.lock().list.count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.inner.lock().list.added()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.inner.lock().list.removed()
pub(crate) fn tasks_count(&self) -> CounterPair {
let lock = self.inner.lock();
CounterPair {
inc: lock.list.added(),
dec: lock.list.removed(),
}
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
Expand Down
18 changes: 8 additions & 10 deletions tokio/src/util/linked_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,6 @@ impl<L: Link> CountedLinkedList<L, L::Target> {
val
}

pub(crate) fn count(&self) -> usize {
// this subtraction can't underflow.
// this cast can't overflow because the length of the linked list can't exceed usize.
(self.added - self.removed) as usize
}

pub(crate) fn added(&self) -> u64 {
self.added
}
Expand Down Expand Up @@ -811,21 +805,25 @@ pub(crate) mod tests {
#[test]
fn count() {
let mut list = CountedLinkedList::<&Entry, <&Entry as Link>::Target>::new();
assert_eq!(0, list.count());
assert_eq!(0, list.added());
assert_eq!(0, list.removed());

let a = entry(5);
let b = entry(7);
list.push_front(a.as_ref());
list.push_front(b.as_ref());
assert_eq!(2, list.count());
assert_eq!(2, list.added());
assert_eq!(0, list.removed());

list.pop_back();
assert_eq!(1, list.count());
assert_eq!(2, list.added());
assert_eq!(1, list.removed());

unsafe {
list.remove(ptr(&b));
}
assert_eq!(0, list.count());
assert_eq!(2, list.added());
assert_eq!(2, list.removed());
}

/// This is a fuzz test. You run it by entering `cargo fuzz run fuzz_linked_list` in CLI in `/tokio/` module.
Expand Down
31 changes: 14 additions & 17 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex};
use std::task::Poll;
use tokio::macros::support::poll_fn;

use tokio::runtime::CounterPair;
use tokio::runtime::Runtime;
use tokio::task::consume_budget;
use tokio::time::{self, Duration};
Expand Down Expand Up @@ -104,36 +105,32 @@ fn active_tasks_count() {
fn active_tasks_count_pairs() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
}))
.unwrap();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 1 }, rt.metrics().task_counts());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 0, dec: 0 }, metrics.task_counts());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(CounterPair { inc: 1, dec: 0 }, metrics.task_counts());
}))
.unwrap();

// for some reason, sometimes the stop count doesn't get a chance to incremenet before we get here.
// Only observed on single-cpu systems. Most likely the worker thread doesn't a chance to clean up
// the spawned task yet. We yield to give it an opportunity.
std::thread::yield_now();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());
for _ in 0..100 {
if rt.metrics().task_counts() == (CounterPair { inc: 1, dec: 1 }) {
return;
}
// on single threaded machines (like in CI), we need to force the OS to run the runtime threads
std::thread::yield_now();
}
panic!("runtime didn't decrement active task gauge")
}

#[test]
Expand Down

0 comments on commit 4ad7dd4

Please sign in to comment.