Skip to content

Commit

Permalink
feat: add task counter pairs
Browse files Browse the repository at this point in the history
  • Loading branch information
conradludgate committed Oct 27, 2023
1 parent cc86fef commit c106692
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 0 deletions.
38 changes: 38 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,44 @@ impl RuntimeMetrics {
self.handle.inner.active_tasks_count()
}

/// Returns the number of started tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.start_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn start_tasks_count(&self) -> u64 {
self.handle.inner.start_tasks_count()
}

/// Returns the number of finished tasks in the runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.stop_tasks_count();
/// println!("Runtime has {} active tasks", n);
/// }
/// ```
pub fn stop_tasks_count(&self) -> u64 {
self.handle.inner.stop_tasks_count()
}

/// Returns the number of idle threads, which have spawned by the runtime
/// for `spawn_blocking` calls.
///
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,14 @@ cfg_metrics! {
pub(crate) fn active_tasks_count(&self) -> usize {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ cfg_rt! {
match_flavor!(self, Handle(handle) => handle.active_tasks_count())
}

pub(crate) fn start_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.start_tasks_count())
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
match_flavor!(self, Handle(handle) => handle.stop_tasks_count())
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
}
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ impl Handle {
self.shared.owned.active_tasks_count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.shared.owned.start_tasks_count()
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.shared.owned.stop_tasks_count()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/task/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ cfg_not_has_atomic_u64! {
pub(crate) struct OwnedTasks<S: 'static> {
inner: Mutex<CountedOwnedTasksInner<S>>,
pub(crate) id: NonZeroU64,
pub(crate) tasks_start_count: AtomicU64,
pub(crate) tasks_stop_count: AtomicU64,
}
struct CountedOwnedTasksInner<S: 'static> {
list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
Expand All @@ -80,6 +82,8 @@ impl<S: 'static> OwnedTasks<S> {
closed: false,
}),
id: get_next_id(),
tasks_start_count: AtomicU64::new(0),
tasks_stop_count: AtomicU64::new(0),
}
}

Expand Down Expand Up @@ -120,6 +124,8 @@ impl<S: 'static> OwnedTasks<S> {
None
} else {
lock.list.push_front(task);
self.tasks_start_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Some(notified)
}
}
Expand Down Expand Up @@ -170,13 +176,26 @@ impl<S: 'static> OwnedTasks<S> {
self.inner.lock().list.count()
}

pub(crate) fn start_tasks_count(&self) -> u64 {
self.tasks_start_count
.load(std::sync::atomic::Ordering::Relaxed)
}

pub(crate) fn stop_tasks_count(&self) -> u64 {
self.tasks_stop_count
.load(std::sync::atomic::Ordering::Relaxed)
}

pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
// If the task's owner ID is `None` then it is not part of any list and
// doesn't need removing.
let task_id = task.header().get_owner_id()?;

assert_eq!(task_id, self.id);

self.tasks_stop_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

// safety: We just checked that the provided task is not in some other
// linked list.
unsafe { self.inner.lock().list.remove(task.header_ptr()) }
Expand Down
31 changes: 31 additions & 0 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,37 @@ fn active_tasks_count() {
});
}

#[test]
fn active_tasks_count_pairs() {
let rt = current_thread();
let metrics = rt.metrics();
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(0, metrics.active_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());

let rt = threaded();
let metrics = rt.metrics();
assert_eq!(0, metrics.stop_tasks_count());
assert_eq!(0, metrics.active_tasks_count());

rt.block_on(rt.spawn(async move {
assert_eq!(1, metrics.start_tasks_count());
assert_eq!(0, metrics.stop_tasks_count());
}))
.unwrap();

assert_eq!(1, rt.metrics().start_tasks_count());
assert_eq!(1, rt.metrics().stop_tasks_count());
}

#[test]
fn remote_schedule_count() {
use std::thread;
Expand Down

0 comments on commit c106692

Please sign in to comment.