Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions tokio/src/runtime/metrics/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ pub(crate) struct MetricsBatch {
/// Instant at which work last resumed (continued after park).
processing_scheduled_tasks_started_at: Instant,

#[cfg(tokio_unstable)]
/// Number of times the worker parked.
park_count: u64,

#[cfg(tokio_unstable)]
/// Number of times the worker parked and unparked.
park_unpark_count: u64,

Expand Down Expand Up @@ -80,6 +78,8 @@ impl MetricsBatch {
MetricsBatch {
busy_duration_total: 0,
processing_scheduled_tasks_started_at: now,
park_count: 0,
park_unpark_count: 0,
}
}
},
Expand Down Expand Up @@ -120,7 +120,12 @@ impl MetricsBatch {
cfg_metrics_variant! {
stable: {
#[inline(always)]
fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {}
fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
worker.park_count.store(self.park_count, Relaxed);
worker
.park_unpark_count
.store(self.park_unpark_count, Relaxed);
}
},
unstable: {
#[inline(always)]
Expand Down Expand Up @@ -153,7 +158,10 @@ impl MetricsBatch {
cfg_metrics_variant! {
stable: {
/// The worker is about to park.
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
self.park_unpark_count += 1;
}
},
unstable: {
/// The worker is about to park.
Expand All @@ -171,18 +179,9 @@ impl MetricsBatch {
}
}
}

cfg_metrics_variant! {
stable: {
/// The worker was unparked.
pub(crate) fn unparked(&mut self) {}
},
unstable: {
/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}
}
/// The worker was unparked.
pub(crate) fn unparked(&mut self) {
self.park_unpark_count += 1;
}

/// Start processing a batch of tasks
Expand Down
195 changes: 97 additions & 98 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,103 @@ impl RuntimeMetrics {
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the total number of times the given worker thread has parked.
///
/// The worker park count starts at zero when the runtime is created and
/// increases by one each time the worker parks the thread waiting for new
/// inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_park_count(0);
/// println!("worker 0 parked {} times", n);
/// }
/// ```
pub fn worker_park_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_count
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked
/// and unparked.
///
/// The worker park/unpark count starts at zero when the runtime is created
/// and increases by one each time the worker parks the thread waiting for
/// new inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle. When new work becomes available,
/// the worker is unparked and the park/unpark count is again increased by one.
///
/// An odd count means that the worker is currently parked.
/// An even count means that the worker is currently active.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
/// let n = metrics.worker_park_unpark_count(0);
///
/// println!("worker 0 parked and unparked {} times", n);
///
/// if n % 2 == 0 {
/// println!("worker 0 is active");
/// } else {
/// println!("worker 0 is parked");
/// }
/// }
/// ```
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_unpark_count
.load(Relaxed)
}
}

cfg_unstable_metrics! {
Expand Down Expand Up @@ -318,104 +415,6 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked.
///
/// The worker park count starts at zero when the runtime is created and
/// increases by one each time the worker parks the thread waiting for new
/// inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_park_count(0);
/// println!("worker 0 parked {} times", n);
/// }
/// ```
pub fn worker_park_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_count
.load(Relaxed)
}

/// Returns the total number of times the given worker thread has parked
/// and unparked.
///
/// The worker park/unpark count starts at zero when the runtime is created
/// and increases by one each time the worker parks the thread waiting for
/// new inbound events to process. This usually means the worker has processed
/// all pending work and is currently idle. When new work becomes available,
/// the worker is unparked and the park/unpark count is again increased by one.
///
/// An odd count means that the worker is currently parked.
/// An even count means that the worker is currently active.
///
/// The counter is monotonically increasing. It is never decremented or
/// reset to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
/// let n = metrics.worker_park_unpark_count(0);
///
/// println!("worker 0 parked and unparked {} times", n);
///
/// if n % 2 == 0 {
/// println!("worker 0 is active");
/// } else {
/// println!("worker 0 is parked");
/// }
/// }
/// ```
pub fn worker_park_unpark_count(&self, worker: usize) -> u64 {
self.handle
.inner
.worker_metrics(worker)
.park_unpark_count
.load(Relaxed)
}


/// Returns the number of times the given worker thread unparked but
/// performed no work before parking again.
///
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ pub(crate) struct WorkerMetrics {
/// Thread id of worker thread.
thread_id: Mutex<Option<ThreadId>>,

#[cfg(tokio_unstable)]
/// Number of times the worker parked.
pub(crate) park_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

Expand Down
59 changes: 59 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::mpsc;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::time;

#[test]
fn num_workers() {
Expand Down Expand Up @@ -125,6 +126,64 @@ fn worker_total_busy_duration() {
}
}

#[test]
fn worker_park_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_park_count(0));

let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_park_count(0));
assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_park_unpark_count() {
let rt = current_thread();
let metrics = rt.metrics();
rt.block_on(rt.spawn(async {})).unwrap();
drop(rt);
assert!(2 <= metrics.worker_park_unpark_count(0));

let rt = threaded();
let metrics = rt.metrics();

// Wait for workers to be parked after runtime startup.
for _ in 0..100 {
if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert_eq!(1, metrics.worker_park_unpark_count(0));
assert_eq!(1, metrics.worker_park_unpark_count(1));

// Spawn a task to unpark and then park a worker.
rt.block_on(rt.spawn(async {})).unwrap();
for _ in 0..100 {
if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1));

// Both threads unpark for runtime shutdown.
drop(rt);
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
assert_eq!(0, metrics.worker_park_unpark_count(1) % 2);
assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1));
}

fn try_block_threaded(rt: &Runtime) -> Result<Vec<mpsc::Sender<()>>, mpsc::RecvTimeoutError> {
let (tx, rx) = mpsc::channel();

Expand Down
Loading
Loading