Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: expand on runtime metrics #4373

Merged
merged 24 commits into from
Jan 22, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more tests
carllerche committed Jan 18, 2022
commit 2bae9e84d473c3e17ff128d2c8c9a4f5a0f11d72
4 changes: 3 additions & 1 deletion tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -512,7 +512,9 @@ impl CoreGuard<'_> {

'outer: loop {
if core.spawner.reset_woken() {
let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx));
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;

1 change: 0 additions & 1 deletion tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
@@ -126,7 +126,6 @@ rt_test! {
n += 1;

assert!(park_count >= noop_count);
assert!(noop_count > 0);

assert_eq!(0, metrics.worker_steal_count(i));
assert_eq!(0, metrics.worker_local_schedule_count(i));
163 changes: 154 additions & 9 deletions tokio/tests/rt_metrics.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,9 @@ fn remote_schedule_count() {
handle.spawn(async {
// DO nothing
})
}).join().unwrap();
})
.join()
.unwrap();

rt.block_on(task).unwrap();

@@ -35,11 +37,13 @@ fn remote_schedule_count() {
handle.spawn(async {
// DO nothing
})
}).join().unwrap();
})
.join()
.unwrap();

rt.block_on(task).unwrap();

assert_eq!(1, rt.metrics().remote_schedule_count());
assert_eq!(1, rt.metrics().remote_schedule_count());
}

#[test]
@@ -58,30 +62,171 @@ fn worker_park_count() {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_park_count(0));
assert_eq!(1, metrics.worker_park_count(1));
assert!(1 <= metrics.worker_park_count(0));
assert!(1 <= metrics.worker_park_count(1));
}

#[test]
fn worker_noop_count() {
// There isn't really a great way to generate no-op parks as they happen as
// false-positive events under concurrency.

let rt = basic();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert_eq!(1, metrics.worker_noop_count(0));
assert_eq!(2, metrics.worker_noop_count(0));

let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
time::sleep(Duration::from_millis(1)).await;
});
drop(rt);
assert!(1 <= metrics.worker_noop_count(0));
assert_eq!(1, metrics.worker_noop_count(1));
assert!(1 <= metrics.worker_noop_count(0));
assert!(1 <= metrics.worker_noop_count(1));
}

#[test]
fn worker_steal_count() {
// This metric only applies to the multi-threaded runtime.
//
// We use a blocking channel to backup one worker thread.
use std::sync::mpsc::channel;

let rt = threaded();
let metrics = rt.metrics();

rt.block_on(async {
let (tx, rx) = channel();

// Move to the runtime.
tokio::spawn(async move {
// Spawn the task that sends to the channel
tokio::spawn(async move {
tx.send(()).unwrap();
});

// Spawn a task that bumps the previous task out of the "next
// scheduled" slot.
tokio::spawn(async {});

// Blocking receive on the channe.
rx.recv().unwrap();
})
.await
.unwrap();
});

drop(rt);

let n: u64 = (0..metrics.num_workers())
.map(|i| metrics.worker_steal_count(i))
.sum();

assert_eq!(1, n);
}

#[test]
fn worker_poll_count() {
const N: u64 = 5;

let rt = basic();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
}
});
drop(rt);
assert_eq!(N, metrics.worker_poll_count(0));

let rt = threaded();
let metrics = rt.metrics();
rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {}).await.unwrap();
}
});
drop(rt);
// Account for the `block_on` task
let n = (0..metrics.num_workers())
.map(|i| metrics.worker_poll_count(i))
.sum();

assert_eq!(N, n);
}

#[test]
fn worker_total_busy_duration() {
const N: usize = 5;

let zero = Duration::from_millis(0);

let rt = basic();
let metrics = rt.metrics();

rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {
tokio::task::yield_now().await;
})
.await
.unwrap();
}
});

drop(rt);

assert!(zero < metrics.worker_total_busy_duration(0));

let rt = threaded();
let metrics = rt.metrics();

rt.block_on(async {
for _ in 0..N {
tokio::spawn(async {
tokio::task::yield_now().await;
})
.await
.unwrap();
}
});

drop(rt);

for i in 0..metrics.num_workers() {
assert!(zero < metrics.worker_total_busy_duration(i));
}
}

#[test]
fn worker_local_schedule_count() {
let rt = basic();
let metrics = rt.metrics();
rt.block_on(async {
tokio::spawn(async {}).await.unwrap();
});
drop(rt);

assert_eq!(1, metrics.worker_local_schedule_count(0));
assert_eq!(0, metrics.remote_schedule_count());
}

#[test]
#[ignore]
fn worker_overflow_count() {}

#[test]
#[ignore]
fn remote_queue_depth() {}

#[test]
#[ignore]
fn worker_local_queue_depth() {}

fn basic() -> Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -95,4 +240,4 @@ fn threaded() -> Runtime {
.enable_all()
.build()
.unwrap()
}
}