Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ ron = "0.6.2"
serde = { version = "1", features = ["derive"] }
# Needed to poll Task examples
futures-lite = "1.11.3"
futures-timer = "3.0.2"
lazy_static = "1"
noop-waker = "0.1"

[[example]]
name = "hello_world"
Expand Down Expand Up @@ -243,6 +246,10 @@ path = "examples/asset/hot_asset_reloading.rs"
name = "async_compute"
path = "examples/async_tasks/async_compute.rs"

[[example]]
name = "async_bench"
path = "examples/async_tasks/async_bench.rs"

# Audio
[[example]]
name = "audio"
Expand Down
7 changes: 7 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ git checkout v0.4.0
- [Application](#application)
- [Assets](#assets)
- [Async Tasks](#async-tasks)
- [Async Task Benchmark](#async-task-benchmark)
- [Audio](#audio)
- [Diagnostics](#diagnostics)
- [ECS (Entity Component System)](#ecs-entity-component-system)
Expand Down Expand Up @@ -138,6 +139,12 @@ Example | File | Description
--- | --- | ---
`async_compute` | [`async_tasks/async_compute.rs`](async_tasks/async_compute.rs) | How to use `AsyncComputeTaskPool` to complete longer running tasks

## Async Task Benchmark

Example | File | Description
--- | --- | ---
`async_bench` | [`async_tasks/async_bench.rs`](async_tasks/async_bench.rs) | Compare perforance between using single block_on and multiple block_on(s) when dealing with massive custom async tasks

## Audio

Example | File | Description
Expand Down
308 changes: 308 additions & 0 deletions examples/async_tasks/async_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
use bevy::{
app::{AppExit, ScheduleRunnerSettings},
prelude::*,
tasks::{AsyncComputeTaskPool, Task},
};
use core::task::Context;
use futures_lite::{
future::{block_on, poll_once},
Future,
};
use std::{
sync::{Arc, LockResult, RwLock, RwLockReadGuard},
task::{Poll, Waker},
time::{Duration, Instant},
};

const TASK_DURATION_SEC: f32 = 0.5;
const FPS: f64 = 120.0;
const FRAME_STEP: f64 = 1.0 / FPS;
const N_STEPS: usize = 10;
const N_TASKS: usize = 100000;

struct FrameCounter {
pub n_frames: usize,
}

// This example benchmarks performance of concurrent custom task handling
// Run with release build: cargo run --release --example async_bench
// Example output:
// windows:
// [no_poll_once] n_frames executed: 238, avg fps: 39.3(target:120), duration: 6.048s
// [noop_waker] n_frames executed: 161, avg fps: 25.7(target:120), duration: 6.253s
// [handle_tasks] n_frames executed: 54, avg fps: 9.4(target:120), duration: 5.743s
// [handle_tasks_par] n_frames executed: 124, avg fps: 21.5(target:120), duration: 5.767s
// [handle_tasks_par_2] n_frames executed: 90, avg fps: 15.4(target:120), duration: 5.835s
fn main() {
App::new()
.insert_resource(ScheduleRunnerSettings::run_loop(Duration::from_secs_f64(
FRAME_STEP,
)))
.insert_resource(FrameCounter { n_frames: 0 })
.add_plugins(MinimalPlugins)
.add_startup_system(spawn_tasks_no_poll_once)
.add_system_to_stage(CoreStage::First, count_frame)
.add_system(handle_tasks_no_poll_once)
.run();
App::new()
.insert_resource(ScheduleRunnerSettings::run_loop(Duration::from_secs_f64(
FRAME_STEP,
)))
.insert_resource(FrameCounter { n_frames: 0 })
.add_plugins(MinimalPlugins)
.add_startup_system(spawn_tasks_noop_waker)
.add_system_to_stage(CoreStage::First, count_frame)
.add_system(handle_tasks_noop_waker)
.run();
for handle_tasks_system in [handle_tasks, handle_tasks_par, handle_tasks_par_2] {
App::new()
.insert_resource(ScheduleRunnerSettings::run_loop(Duration::from_secs_f64(
FRAME_STEP,
)))
.insert_resource(FrameCounter { n_frames: 0 })
.add_plugins(MinimalPlugins)
.add_startup_system(spawn_tasks)
.add_system_to_stage(CoreStage::First, count_frame)
.add_system(handle_tasks_system)
.run();
}
}

fn spawn_tasks(mut commands: Commands, thread_pool: Res<AsyncComputeTaskPool>) {
for step in 0..N_STEPS {
for _i in 0..N_TASKS {
let task = thread_pool.spawn(async move {
let start_time = Instant::now();
let duration = Duration::from_secs_f32(TASK_DURATION_SEC * (step as f32));
while Instant::now() - start_time < duration {
futures_timer::Delay::new(Duration::from_secs_f32(0.1)).await
}
true
});
commands.spawn().insert(task);
}
}
}

fn count_frame(mut frame_counter: ResMut<FrameCounter>) {
frame_counter.n_frames += 1;
}

fn handle_tasks(
mut commands: Commands,
mut transform_tasks: Query<(Entity, &mut Task<bool>)>,
mut app_exit_events: EventWriter<AppExit>,
time: Res<Time>,
frame_counter: Res<FrameCounter>,
) {
let mut n_tasks = 0;
for (entity, mut task) in transform_tasks.iter_mut() {
n_tasks += 1;
let ret = block_on(async { poll_once(&mut *task).await });
if ret.is_some() {
commands.entity(entity).remove::<Task<bool>>();
}
}
if n_tasks == 0 {
print_statistics("handle_tasks", &frame_counter, &time);
app_exit_events.send(AppExit);
}
}

fn handle_tasks_par(
mut commands: Commands,
mut transform_tasks: Query<(Entity, &mut Task<bool>)>,
mut app_exit_events: EventWriter<AppExit>,
time: Res<Time>,
frame_counter: Res<FrameCounter>,
) {
let futures = transform_tasks
.iter_mut()
.map(|(entity, mut task)| async move {
if poll_once(&mut *task).await.is_some() {
Some(entity)
} else {
None
}
});
let mut n_tasks = 0;
block_on(async {
for f in futures {
n_tasks += 1;
if let Some(entity) = f.await {
commands.entity(entity).remove::<Task<bool>>();
}
}
});
if n_tasks == 0 {
print_statistics("handle_tasks_par", &frame_counter, &time);
app_exit_events.send(AppExit);
}
}

fn handle_tasks_par_2(
mut commands: Commands,
mut transform_tasks: Query<(Entity, &mut Task<bool>)>,
mut app_exit_events: EventWriter<AppExit>,
time: Res<Time>,
frame_counter: Res<FrameCounter>,
) {
let futures = transform_tasks
.iter_mut()
.map(|(entity, mut task)| async move {
if poll_once(&mut *task).await.is_some() {
Some(entity)
} else {
None
}
})
.collect::<Vec<_>>();
let n_tasks = futures.len();
block_on(async {
for f in futures {
if let Some(entity) = f.await {
commands.entity(entity).remove::<Task<bool>>();
}
}
});
if n_tasks == 0 {
print_statistics("handle_tasks_par_2", &frame_counter, &time);
app_exit_events.send(AppExit);
}
}

fn print_statistics(name: &str, frame_counter: &Res<FrameCounter>, time: &Res<Time>) {
let duration_sec = time.seconds_since_startup();
println!(
"{:width$}n_frames executed: {}, avg fps: {:.1}(target:{}), duration: {:.3}s",
format!("[{}]", name),
frame_counter.n_frames,
(frame_counter.n_frames as f64) / duration_sec,
FPS,
duration_sec,
width = 22,
);
}

#[derive(Debug)]
struct TaskWrapper<T> {
result: Arc<RwLock<Option<T>>>,
_task: Task<()>,
}

impl<T> TaskWrapper<T> {
pub fn new(result: Arc<RwLock<Option<T>>>, task: Task<()>) -> Self {
Self {
result,
_task: task,
}
}

pub fn poll(&self) -> LockResult<RwLockReadGuard<Option<T>>> {
self.result.read()
}
}

fn spawn_tasks_no_poll_once(mut commands: Commands, thread_pool: Res<AsyncComputeTaskPool>) {
for step in 0..N_STEPS {
for _i in 0..N_TASKS {
let result: Arc<RwLock<Option<()>>> = Arc::new(RwLock::new(None));
let result_clone = result.clone();
let task = thread_pool.spawn(async move {
let start_time = Instant::now();
let duration = Duration::from_secs_f32(TASK_DURATION_SEC * (step as f32));
while Instant::now() - start_time < duration {
futures_timer::Delay::new(Duration::from_secs_f32(0.1)).await;
}
// println!("spawn_tasks_no_poll_once");
let mut locked = result_clone.write().unwrap();
*locked = Some(());
});
let wrapper = TaskWrapper::new(result, task);
commands.spawn().insert(wrapper);
}
}
}

fn handle_tasks_no_poll_once(
mut commands: Commands,
transform_tasks: Query<(Entity, &TaskWrapper<()>)>,
mut app_exit_events: EventWriter<AppExit>,
time: Res<Time>,
frame_counter: Res<FrameCounter>,
) {
let mut n_tasks = 0;
for (entity, task) in transform_tasks.iter() {
n_tasks += 1;
let locked = task.poll().unwrap();
if locked.is_some() {
commands.entity(entity).remove::<TaskWrapper<()>>();
}
}
if n_tasks == 0 {
print_statistics("no_poll_once", &frame_counter, &time);
app_exit_events.send(AppExit);
}
}

lazy_static::lazy_static! {
static ref NOOP_WAKER: Waker = noop_waker::noop_waker();
}

#[derive(Debug)]
struct TaskWrapper2<T> {
task: Task<T>,
}

impl<T: Clone> TaskWrapper2<T> {
pub fn new(task: Task<T>) -> Self {
Self { task }
}

pub fn poll(&mut self) -> Option<T> {
let f = &mut self.task;
futures_lite::pin!(f);
let mut noop_ctx = Context::from_waker(&NOOP_WAKER);
match f.poll(&mut noop_ctx) {
Poll::Ready(o) => Some(o.clone()),
Poll::Pending => None,
}
}
}

fn spawn_tasks_noop_waker(mut commands: Commands, thread_pool: Res<AsyncComputeTaskPool>) {
for step in 0..N_STEPS {
for _i in 0..N_TASKS {
let task = thread_pool.spawn(async move {
let start_time = Instant::now();
let duration = Duration::from_secs_f32(TASK_DURATION_SEC * (step as f32));
while Instant::now() - start_time < duration {
futures_timer::Delay::new(Duration::from_secs_f32(0.1)).await;
}
// println!("spawn_tasks_noop_waker");
});
let wrapper = TaskWrapper2::<()>::new(task);
commands.spawn().insert(wrapper);
}
}
}

fn handle_tasks_noop_waker(
mut commands: Commands,
mut transform_tasks: Query<(Entity, &mut TaskWrapper2<()>)>,
mut app_exit_events: EventWriter<AppExit>,
time: Res<Time>,
frame_counter: Res<FrameCounter>,
) {
let mut n_tasks = 0;
for (entity, mut task) in transform_tasks.iter_mut() {
n_tasks += 1;
if task.poll().is_some() {
commands.entity(entity).remove::<TaskWrapper2<()>>();
}
}
if n_tasks == 0 {
print_statistics("noop_waker", &frame_counter, &time);
app_exit_events.send(AppExit);
}
}