Skip to content

Commit

Permalink
metrics: add MetricAtomicUsize for usized-metrics (#6598)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcoh authored Jun 6, 2024
1 parent 16fccaf commit 8e15c23
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 29 deletions.
21 changes: 11 additions & 10 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::util::metric_atomics::MetricAtomicUsize;

use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering;
use std::time::Duration;

pub(crate) struct BlockingPool {
Expand All @@ -26,9 +27,9 @@ pub(crate) struct Spawner {

#[derive(Default)]
pub(crate) struct SpawnerMetrics {
num_threads: AtomicUsize,
num_idle_threads: AtomicUsize,
queue_depth: AtomicUsize,
num_threads: MetricAtomicUsize,
num_idle_threads: MetricAtomicUsize,
queue_depth: MetricAtomicUsize,
}

impl SpawnerMetrics {
Expand All @@ -47,27 +48,27 @@ impl SpawnerMetrics {
}

fn inc_num_threads(&self) {
self.num_threads.fetch_add(1, Ordering::Relaxed);
self.num_threads.increment();
}

fn dec_num_threads(&self) {
self.num_threads.fetch_sub(1, Ordering::Relaxed);
self.num_threads.decrement();
}

fn inc_num_idle_threads(&self) {
self.num_idle_threads.fetch_add(1, Ordering::Relaxed);
self.num_idle_threads.increment();
}

fn dec_num_idle_threads(&self) -> usize {
self.num_idle_threads.fetch_sub(1, Ordering::Relaxed)
self.num_idle_threads.decrement()
}

fn inc_queue_depth(&self) {
self.queue_depth.fetch_add(1, Ordering::Relaxed);
self.queue_depth.increment();
}

fn dec_queue_depth(&self) {
self.queue_depth.fetch_sub(1, Ordering::Relaxed);
self.queue_depth.decrement();
}
}

Expand Down
23 changes: 4 additions & 19 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::MetricAtomicU64;
// This is NOT the Loom atomic. To avoid an unnecessary state explosion in loom,
// all metrics use regular atomics.
use std::sync::atomic::AtomicUsize;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;

/// Retrieve runtime worker metrics.
Expand All @@ -13,7 +10,7 @@ use std::sync::atomic::Ordering::Relaxed;
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
#[derive(Debug)]
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
/// Number of times the worker parked.
Expand Down Expand Up @@ -45,7 +42,7 @@ pub(crate) struct WorkerMetrics {

/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
pub(crate) queue_depth: AtomicUsize,
pub(crate) queue_depth: MetricAtomicUsize,

/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,
Expand All @@ -62,19 +59,7 @@ impl WorkerMetrics {
}

pub(crate) fn new() -> WorkerMetrics {
WorkerMetrics {
park_count: MetricAtomicU64::new(0),
noop_count: MetricAtomicU64::new(0),
steal_count: MetricAtomicU64::new(0),
steal_operations: MetricAtomicU64::new(0),
poll_count: MetricAtomicU64::new(0),
mean_poll_time: MetricAtomicU64::new(0),
overflow_count: MetricAtomicU64::new(0),
busy_duration_total: MetricAtomicU64::new(0),
local_schedule_count: MetricAtomicU64::new(0),
queue_depth: AtomicUsize::new(0),
poll_count_histogram: None,
}
WorkerMetrics::default()
}

pub(crate) fn queue_depth(&self) -> usize {
Expand Down
28 changes: 28 additions & 0 deletions tokio/src/util/metric_atomics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,31 @@ impl MetricAtomicU64 {
pub(crate) fn new(_value: u64) -> Self { Self { } }
}
}

#[cfg_attr(not(all(tokio_unstable, feature = "rt")), allow(dead_code))]
/// `AtomicUsize` for use in metrics.
///
/// This exposes simplified APIs for use in metrics & uses `std::sync` instead of Loom to avoid polluting loom logs with metric information.
#[derive(Debug, Default)]
pub(crate) struct MetricAtomicUsize {
value: std::sync::atomic::AtomicUsize,
}

#[cfg_attr(not(all(tokio_unstable, feature = "rt")), allow(dead_code))]
impl MetricAtomicUsize {
pub(crate) fn load(&self, ordering: Ordering) -> usize {
self.value.load(ordering)
}

pub(crate) fn store(&self, val: usize, ordering: Ordering) {
self.value.store(val, ordering)
}

pub(crate) fn increment(&self) -> usize {
self.value.fetch_add(1, Ordering::Relaxed)
}

pub(crate) fn decrement(&self) -> usize {
self.value.fetch_sub(1, Ordering::Relaxed)
}
}

0 comments on commit 8e15c23

Please sign in to comment.