From b69f16aa219818bc75e7ae6a22631d4e574efd39 Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Tue, 23 Jul 2024 09:15:54 +0200 Subject: [PATCH] metrics: add `worker_park_unpark_count` (#6696) --- tokio/src/runtime/metrics/batch.rs | 13 +++++ tokio/src/runtime/metrics/mock.rs | 1 + tokio/src/runtime/metrics/runtime.rs | 55 +++++++++++++++++++ tokio/src/runtime/metrics/worker.rs | 3 + .../runtime/scheduler/current_thread/mod.rs | 3 + .../runtime/scheduler/multi_thread/stats.rs | 4 ++ .../runtime/scheduler/multi_thread/worker.rs | 5 ++ .../scheduler/multi_thread_alt/stats.rs | 4 ++ .../scheduler/multi_thread_alt/worker.rs | 3 + tokio/tests/rt_unstable_metrics.rs | 38 +++++++++++++ 10 files changed, 129 insertions(+) diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index 1d0f3dea30a..6118bcd04ca 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -7,6 +7,9 @@ pub(crate) struct MetricsBatch { /// Number of times the worker parked. park_count: u64, + /// Number of times the worker parked and unparked. + park_unpark_count: u64, + /// Number of times the worker woke w/o doing work. noop_count: u64, @@ -54,6 +57,7 @@ impl MetricsBatch { MetricsBatch { park_count: 0, + park_unpark_count: 0, noop_count: 0, steal_count: 0, steal_operations: 0, @@ -76,6 +80,9 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, worker: &WorkerMetrics, mean_poll_time: u64) { worker.mean_poll_time.store(mean_poll_time, Relaxed); worker.park_count.store(self.park_count, Relaxed); + worker + .park_unpark_count + .store(self.park_unpark_count, Relaxed); worker.noop_count.store(self.noop_count, Relaxed); worker.steal_count.store(self.steal_count, Relaxed); worker @@ -101,6 +108,7 @@ impl MetricsBatch { /// The worker is about to park. pub(crate) fn about_to_park(&mut self) { self.park_count += 1; + self.park_unpark_count += 1; if self.poll_count_on_last_park == self.poll_count { self.noop_count += 1; @@ -109,6 +117,11 @@ impl MetricsBatch { } } + /// The worker was unparked. + pub(crate) fn unparked(&mut self) { + self.park_unpark_count += 1; + } + /// Start processing a batch of tasks pub(crate) fn start_processing_scheduled_tasks(&mut self) { self.processing_scheduled_tasks_started_at = Instant::now(); diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs index e4bb3a99d0c..f4dc116539b 100644 --- a/tokio/src/runtime/metrics/mock.rs +++ b/tokio/src/runtime/metrics/mock.rs @@ -39,6 +39,7 @@ impl MetricsBatch { pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {} pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn unparked(&mut self) {} pub(crate) fn inc_local_schedule_count(&mut self) {} pub(crate) fn start_processing_scheduled_tasks(&mut self) {} pub(crate) fn end_processing_scheduled_tasks(&mut self) {} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index fdbda6f3cb9..5bb79927a82 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -242,6 +242,61 @@ impl RuntimeMetrics { .load(Relaxed) } + /// Returns the total number of times the given worker thread has parked + /// and unparked. + /// + /// The worker park/unpark count starts at zero when the runtime is created + /// and increases by one each time the worker parks the thread waiting for + /// new inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. When new work becomes available, + /// the worker is unparked and the park/unpark count is again increased by one. + /// + /// An odd count means that the worker is currently parked. + /// An even count means that the worker is currently active. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// let n = metrics.worker_park_unpark_count(0); + /// + /// println!("worker 0 parked and unparked {} times", n); + /// + /// if n % 2 == 0 { + /// println!("worker 0 is active"); + /// } else { + /// println!("worker 0 is parked"); + /// } + /// } + /// ``` + pub fn worker_park_unpark_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_unpark_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread unparked but /// performed no work before parking again. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index e5d2c6f17fd..02dddc85247 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -16,6 +16,9 @@ pub(crate) struct WorkerMetrics { /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, + /// Number of times the worker parked and unparked. + pub(crate) park_unpark_count: MetricAtomicU64, + /// Number of times the worker woke then parked again without doing work. pub(crate) noop_count: MetricAtomicU64, diff --git a/tokio/src/runtime/scheduler/current_thread/mod.rs b/tokio/src/runtime/scheduler/current_thread/mod.rs index 42a0a8822f4..4a3d849d264 100644 --- a/tokio/src/runtime/scheduler/current_thread/mod.rs +++ b/tokio/src/runtime/scheduler/current_thread/mod.rs @@ -368,6 +368,9 @@ impl Context { }); core = c; + + core.metrics.unparked(); + core.submit_metrics(handle); } if let Some(f) = &handle.shared.config.after_unpark { diff --git a/tokio/src/runtime/scheduler/multi_thread/stats.rs b/tokio/src/runtime/scheduler/multi_thread/stats.rs index 9d495706e8d..c59d4373ab8 100644 --- a/tokio/src/runtime/scheduler/multi_thread/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread/stats.rs @@ -74,6 +74,10 @@ impl Stats { self.batch.about_to_park(); } + pub(crate) fn unparked(&mut self) { + self.batch.unparked(); + } + pub(crate) fn inc_local_schedule_count(&mut self) { self.batch.inc_local_schedule_count(); } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 8ef487b09fd..d71e62df53a 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -699,8 +699,13 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown && !core.is_traced { core.stats.about_to_park(); + core.stats + .submit(&self.worker.handle.shared.worker_metrics[self.worker.index]); + core = self.park_timeout(core, None); + core.stats.unparked(); + // Run regularly scheduled maintenance core.maintenance(&self.worker); diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs index c2045602797..b7fdd82c9ef 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/stats.rs @@ -100,6 +100,10 @@ impl Stats { self.batch.about_to_park(); } + pub(crate) fn unparked(&mut self) { + self.batch.unparked(); + } + pub(crate) fn inc_local_schedule_count(&mut self) { self.batch.inc_local_schedule_count(); } diff --git a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs index 9ceb7815a53..8f07b84297a 100644 --- a/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread_alt/worker.rs @@ -658,6 +658,9 @@ impl Worker { let n = cmp::max(core.run_queue.remaining_slots() / 2, 1); let maybe_task = self.next_remote_task_batch_synced(cx, &mut synced, &mut core, n); + core.stats.unparked(); + self.flush_metrics(cx, &mut core); + Ok((maybe_task, core)) } diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 6640c524a69..2e51edee4d9 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -170,6 +170,44 @@ fn worker_park_count() { assert!(1 <= metrics.worker_park_count(1)); } +#[test] +fn worker_park_unpark_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(rt.spawn(async {})).unwrap(); + drop(rt); + assert!(2 <= metrics.worker_park_unpark_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + // Wait for workers to be parked after runtime startup. + for _ in 0..100 { + if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert_eq!(1, metrics.worker_park_unpark_count(0)); + assert_eq!(1, metrics.worker_park_unpark_count(1)); + + // Spawn a task to unpark and then park a worker. + rt.block_on(rt.spawn(async {})).unwrap(); + for _ in 0..100 { + if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1)); + + // Both threads unpark for runtime shutdown. + drop(rt); + assert_eq!(0, metrics.worker_park_unpark_count(0) % 2); + assert_eq!(0, metrics.worker_park_unpark_count(1) % 2); + assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1)); +} + #[test] fn worker_noop_count() { // There isn't really a great way to generate no-op parks as they happen as