diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs index e0ccf3e358c..00f9c98982a 100644 --- a/tokio/src/runtime/metrics/batch.rs +++ b/tokio/src/runtime/metrics/batch.rs @@ -14,11 +14,9 @@ pub(crate) struct MetricsBatch { /// Instant at which work last resumed (continued after park). processing_scheduled_tasks_started_at: Instant, - #[cfg(tokio_unstable)] /// Number of times the worker parked. park_count: u64, - #[cfg(tokio_unstable)] /// Number of times the worker parked and unparked. park_unpark_count: u64, @@ -80,6 +78,8 @@ impl MetricsBatch { MetricsBatch { busy_duration_total: 0, processing_scheduled_tasks_started_at: now, + park_count: 0, + park_unpark_count: 0, } } }, @@ -120,7 +120,12 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { #[inline(always)] - fn submit_unstable(&mut self, _worker: &WorkerMetrics, _mean_poll_time: u64) {} + fn submit_unstable(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) { + worker.park_count.store(self.park_count, Relaxed); + worker + .park_unpark_count + .store(self.park_unpark_count, Relaxed); + } }, unstable: { #[inline(always)] @@ -153,7 +158,10 @@ impl MetricsBatch { cfg_metrics_variant! { stable: { /// The worker is about to park. - pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn about_to_park(&mut self) { + self.park_count += 1; + self.park_unpark_count += 1; + } }, unstable: { /// The worker is about to park. @@ -171,18 +179,9 @@ impl MetricsBatch { } } } - - cfg_metrics_variant! { - stable: { - /// The worker was unparked. - pub(crate) fn unparked(&mut self) {} - }, - unstable: { - /// The worker was unparked. - pub(crate) fn unparked(&mut self) { - self.park_unpark_count += 1; - } - } + /// The worker was unparked. + pub(crate) fn unparked(&mut self) { + self.park_unpark_count += 1; } /// Start processing a batch of tasks diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index b8f71a95f04..188425e7e11 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -143,6 +143,103 @@ impl RuntimeMetrics { .load(Relaxed); Duration::from_nanos(nanos) } + + /// Returns the total number of times the given worker thread has parked. + /// + /// The worker park count starts at zero when the runtime is created and + /// increases by one each time the worker parks the thread waiting for new + /// inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_park_count(0); + /// println!("worker 0 parked {} times", n); + /// } + /// ``` + pub fn worker_park_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_count + .load(Relaxed) + } + + /// Returns the total number of times the given worker thread has parked + /// and unparked. + /// + /// The worker park/unpark count starts at zero when the runtime is created + /// and increases by one each time the worker parks the thread waiting for + /// new inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. When new work becomes available, + /// the worker is unparked and the park/unpark count is again increased by one. + /// + /// An odd count means that the worker is currently parked. + /// An even count means that the worker is currently active. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to identify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// let n = metrics.worker_park_unpark_count(0); + /// + /// println!("worker 0 parked and unparked {} times", n); + /// + /// if n % 2 == 0 { + /// println!("worker 0 is active"); + /// } else { + /// println!("worker 0 is parked"); + /// } + /// } + /// ``` + pub fn worker_park_unpark_count(&self, worker: usize) -> u64 { + self.handle + .inner + .worker_metrics(worker) + .park_unpark_count + .load(Relaxed) + } } cfg_unstable_metrics! { @@ -318,104 +415,6 @@ impl RuntimeMetrics { .load(Relaxed) } - /// Returns the total number of times the given worker thread has parked. - /// - /// The worker park count starts at zero when the runtime is created and - /// increases by one each time the worker parks the thread waiting for new - /// inbound events to process. This usually means the worker has processed - /// all pending work and is currently idle. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// - /// let n = metrics.worker_park_count(0); - /// println!("worker 0 parked {} times", n); - /// } - /// ``` - pub fn worker_park_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .park_count - .load(Relaxed) - } - - /// Returns the total number of times the given worker thread has parked - /// and unparked. - /// - /// The worker park/unpark count starts at zero when the runtime is created - /// and increases by one each time the worker parks the thread waiting for - /// new inbound events to process. This usually means the worker has processed - /// all pending work and is currently idle. When new work becomes available, - /// the worker is unparked and the park/unpark count is again increased by one. - /// - /// An odd count means that the worker is currently parked. - /// An even count means that the worker is currently active. - /// - /// The counter is monotonically increasing. It is never decremented or - /// reset to zero. - /// - /// # Arguments - /// - /// `worker` is the index of the worker being queried. The given value must - /// be between 0 and `num_workers()`. The index uniquely identifies a single - /// worker and will continue to identify the worker throughout the lifetime - /// of the runtime instance. - /// - /// # Panics - /// - /// The method panics when `worker` represents an invalid worker, i.e. is - /// greater than or equal to `num_workers()`. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let metrics = Handle::current().metrics(); - /// let n = metrics.worker_park_unpark_count(0); - /// - /// println!("worker 0 parked and unparked {} times", n); - /// - /// if n % 2 == 0 { - /// println!("worker 0 is active"); - /// } else { - /// println!("worker 0 is parked"); - /// } - /// } - /// ``` - pub fn worker_park_unpark_count(&self, worker: usize) -> u64 { - self.handle - .inner - .worker_metrics(worker) - .park_unpark_count - .load(Relaxed) - } - - /// Returns the number of times the given worker thread unparked but /// performed no work before parking again. /// diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs index 2ecd53f6791..30926b2a6c2 100644 --- a/tokio/src/runtime/metrics/worker.rs +++ b/tokio/src/runtime/metrics/worker.rs @@ -28,11 +28,9 @@ pub(crate) struct WorkerMetrics { /// Thread id of worker thread. thread_id: Mutex>, - #[cfg(tokio_unstable)] /// Number of times the worker parked. pub(crate) park_count: MetricAtomicU64, - #[cfg(tokio_unstable)] /// Number of times the worker parked and unparked. pub(crate) park_unpark_count: MetricAtomicU64, diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index cf0900c1e6f..2b0ac933f1b 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -4,6 +4,7 @@ use std::sync::mpsc; use std::time::Duration; use tokio::runtime::Runtime; +use tokio::time; #[test] fn num_workers() { @@ -125,6 +126,64 @@ fn worker_total_busy_duration() { } } +#[test] +fn worker_park_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_park_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_park_count(0)); + assert!(1 <= metrics.worker_park_count(1)); +} + +#[test] +fn worker_park_unpark_count() { + let rt = current_thread(); + let metrics = rt.metrics(); + rt.block_on(rt.spawn(async {})).unwrap(); + drop(rt); + assert!(2 <= metrics.worker_park_unpark_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + // Wait for workers to be parked after runtime startup. + for _ in 0..100 { + if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert_eq!(1, metrics.worker_park_unpark_count(0)); + assert_eq!(1, metrics.worker_park_unpark_count(1)); + + // Spawn a task to unpark and then park a worker. + rt.block_on(rt.spawn(async {})).unwrap(); + for _ in 0..100 { + if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1)); + + // Both threads unpark for runtime shutdown. + drop(rt); + assert_eq!(0, metrics.worker_park_unpark_count(0) % 2); + assert_eq!(0, metrics.worker_park_unpark_count(1) % 2); + assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1)); +} + fn try_block_threaded(rt: &Runtime) -> Result>, mpsc::RecvTimeoutError> { let (tx, rx) = mpsc::channel(); diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index 05486e56b6c..a8246282126 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -212,64 +212,6 @@ fn worker_thread_id_threaded() { .unwrap() } -#[test] -fn worker_park_count() { - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(1 <= metrics.worker_park_count(0)); - - let rt = threaded(); - let metrics = rt.metrics(); - rt.block_on(async { - time::sleep(Duration::from_millis(1)).await; - }); - drop(rt); - assert!(1 <= metrics.worker_park_count(0)); - assert!(1 <= metrics.worker_park_count(1)); -} - -#[test] -fn worker_park_unpark_count() { - let rt = current_thread(); - let metrics = rt.metrics(); - rt.block_on(rt.spawn(async {})).unwrap(); - drop(rt); - assert!(2 <= metrics.worker_park_unpark_count(0)); - - let rt = threaded(); - let metrics = rt.metrics(); - - // Wait for workers to be parked after runtime startup. - for _ in 0..100 { - if 1 <= metrics.worker_park_unpark_count(0) && 1 <= metrics.worker_park_unpark_count(1) { - break; - } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - assert_eq!(1, metrics.worker_park_unpark_count(0)); - assert_eq!(1, metrics.worker_park_unpark_count(1)); - - // Spawn a task to unpark and then park a worker. - rt.block_on(rt.spawn(async {})).unwrap(); - for _ in 0..100 { - if 3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1) { - break; - } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - assert!(3 <= metrics.worker_park_unpark_count(0) || 3 <= metrics.worker_park_unpark_count(1)); - - // Both threads unpark for runtime shutdown. - drop(rt); - assert_eq!(0, metrics.worker_park_unpark_count(0) % 2); - assert_eq!(0, metrics.worker_park_unpark_count(1) % 2); - assert!(4 <= metrics.worker_park_unpark_count(0) || 4 <= metrics.worker_park_unpark_count(1)); -} - #[test] fn worker_noop_count() { // There isn't really a great way to generate no-op parks as they happen as