From aa757038d3df4029581585fae913090072a31686 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 10:47:26 -0700 Subject: [PATCH 01/95] Internalize task distinction into TaskPool --- crates/bevy_asset/src/asset_server.rs | 2 +- crates/bevy_asset/src/lib.rs | 4 +- crates/bevy_core/src/task_pool_options.rs | 82 +++++------ .../src/schedule/executor_parallel.rs | 8 +- crates/bevy_tasks/src/lib.rs | 5 +- crates/bevy_tasks/src/task_pool.rs | 130 +++++++++++++----- crates/bevy_tasks/src/usages.rs | 52 ------- 7 files changed, 138 insertions(+), 145 deletions(-) delete mode 100644 crates/bevy_tasks/src/usages.rs diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index 04d1f3bda07d4..4efd569b6e450 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -379,7 +379,7 @@ impl AssetServer { let owned_path = asset_path.to_owned(); self.server .task_pool - .spawn(async move { + .spawn_io(async move { if let Err(err) = server.load_async(owned_path, force).await { warn!("{}", err); } diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 870f100d10306..6ee49a17ee8b4 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -30,7 +30,7 @@ pub use path::*; use bevy_app::{prelude::Plugin, App}; use bevy_ecs::schedule::{StageLabel, SystemStage}; -use bevy_tasks::IoTaskPool; +use bevy_tasks::TaskPool; /// The names of asset stages in an App Schedule #[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] @@ -82,7 +82,7 @@ pub fn create_platform_default_asset_io(app: &mut App) -> Box { impl Plugin for AssetPlugin { fn build(&self, app: &mut App) { if !app.world.contains_resource::() { - let task_pool = app.world.resource::().0.clone(); + let task_pool = app.world.resource::().clone(); let source = create_platform_default_asset_io(app); diff --git a/crates/bevy_core/src/task_pool_options.rs b/crates/bevy_core/src/task_pool_options.rs index 19c9dad5bfe2f..dd2ef11b9ae79 100644 --- a/crates/bevy_core/src/task_pool_options.rs +++ b/crates/bevy_core/src/task_pool_options.rs @@ -1,5 +1,5 @@ use bevy_ecs::world::World; -use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::{TaskPool, TaskPoolBuilder}; use bevy_utils::tracing::trace; /// Defines a simple way to determine how many threads to use given the number of remaining cores @@ -100,54 +100,38 @@ impl DefaultTaskPoolOptions { let mut remaining_threads = total_threads; - if !world.contains_resource::() { - // Determine the number of IO threads we will use - let io_threads = self - .io - .get_number_of_threads(remaining_threads, total_threads); - - trace!("IO Threads: {}", io_threads); - remaining_threads = remaining_threads.saturating_sub(io_threads); - - world.insert_resource(IoTaskPool( - TaskPoolBuilder::default() - .num_threads(io_threads) - .thread_name("IO Task Pool".to_string()) - .build(), - )); - } - - if !world.contains_resource::() { - // Determine the number of async compute threads we will use - let async_compute_threads = self - .async_compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Async Compute Threads: {}", async_compute_threads); - remaining_threads = remaining_threads.saturating_sub(async_compute_threads); - - world.insert_resource(AsyncComputeTaskPool( - TaskPoolBuilder::default() - .num_threads(async_compute_threads) - .thread_name("Async Compute Task Pool".to_string()) - .build(), - )); - } - - if !world.contains_resource::() { - // Determine the number of compute threads we will use - // This is intentionally last so that an end user can specify 1.0 as the percent - let compute_threads = self - .compute - .get_number_of_threads(remaining_threads, total_threads); - - trace!("Compute Threads: {}", compute_threads); - world.insert_resource(ComputeTaskPool( - TaskPoolBuilder::default() - .num_threads(compute_threads) - .thread_name("Compute Task Pool".to_string()) - .build(), - )); + if world.contains_resource::() { + return; } + // Determine the number of IO threads we will use + let io_threads = self + .io + .get_number_of_threads(remaining_threads, total_threads); + + trace!("IO Threads: {}", io_threads); + remaining_threads = remaining_threads.saturating_sub(io_threads); + + // Determine the number of async compute threads we will use + let async_compute_threads = self + .async_compute + .get_number_of_threads(remaining_threads, total_threads); + + trace!("Async Compute Threads: {}", async_compute_threads); + remaining_threads = remaining_threads.saturating_sub(async_compute_threads); + + // Determine the number of compute threads we will use + // This is intentionally last so that an end user can specify 1.0 as the percent + let compute_threads = self + .compute + .get_number_of_threads(remaining_threads, total_threads); + + world.insert_resource( + TaskPoolBuilder::default() + .compute_threads(compute_threads) + .async_compute_threads(async_compute_threads) + .io_threads(io_threads) + .thread_name("Task Pool".to_string()) + .build(), + ); } } diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 149d8d02bc2df..60e769a589855 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -5,7 +5,7 @@ use crate::{ world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{Scope, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use fixedbitset::FixedBitSet; @@ -123,10 +123,10 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - let compute_pool = world - .get_resource_or_insert_with(|| ComputeTaskPool(TaskPool::default())) + let task_pool = world + .get_resource_or_insert_with(|| TaskPool::default()) .clone(); - compute_pool.scope(|scope| { + task_pool.scope(|scope| { self.prepare_systems(scope, systems, world); let parallel_executor = async { // All systems have been ran if there are no queued or running systems. diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7345d775ee968..1967f150f1ace 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -17,9 +17,6 @@ mod single_threaded_task_pool; #[cfg(target_arch = "wasm32")] pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder}; -mod usages; -pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; - mod iter; pub use iter::ParallelIterator; @@ -29,7 +26,7 @@ pub mod prelude { pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, + task_pool::TaskPool, }; } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ebd6ba6b41f4c..af237c74ad9b9 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -6,7 +6,7 @@ use std::{ thread::{self, JoinHandle}, }; -use futures_lite::{future, pin}; +use futures_lite::{future, pin, FutureExt}; use crate::Task; @@ -16,7 +16,9 @@ use crate::Task; pub struct TaskPoolBuilder { /// If set, we'll set up the thread pool to use at most n threads. Otherwise use /// the logical core count of the system - num_threads: Option, + compute_threads: Option, + async_compute_threads: Option, + io_threads: Option, /// If set, we'll use the given stack size rather than the system default stack_size: Option, /// Allows customizing the name of the threads - helpful for debugging. If set, threads will @@ -32,8 +34,18 @@ impl TaskPoolBuilder { /// Override the number of threads created for the pool. If unset, we default to the number /// of logical cores of the system - pub fn num_threads(mut self, num_threads: usize) -> Self { - self.num_threads = Some(num_threads); + pub fn compute_threads(mut self, num_threads: usize) -> Self { + self.compute_threads = Some(num_threads); + self + } + + pub fn async_compute_threads(mut self, num_threads: usize) -> Self { + self.async_compute_threads = Some(num_threads); + self + } + + pub fn io_threads(mut self, num_threads: usize) -> Self { + self.io_threads = Some(num_threads); self } @@ -52,11 +64,7 @@ impl TaskPoolBuilder { /// Creates a new [`TaskPool`] based on the current options. pub fn build(self) -> TaskPool { - TaskPool::new_internal( - self.num_threads, - self.stack_size, - self.thread_name.as_deref(), - ) + TaskPool::new_internal(self) } } @@ -89,7 +97,9 @@ pub struct TaskPool { /// This has to be separate from TaskPoolInner because we have to create an Arc to /// pass into the worker threads, and we must create the worker threads before we can create /// the Vec> contained within TaskPoolInner - executor: Arc>, + compute_executor: Arc>, + async_compute_executor: Arc>, + io_executor: Arc>, /// Inner state of the pool inner: Arc, @@ -105,27 +115,22 @@ impl TaskPool { TaskPoolBuilder::new().build() } - fn new_internal( - num_threads: Option, - stack_size: Option, - thread_name: Option<&str>, - ) -> Self { + fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(async_executor::Executor::new()); + let compute_executor = Arc::new(async_executor::Executor::new()); + let async_compute_executor = Arc::new(async_executor::Executor::new()); + let io_executor = Arc::new(async_executor::Executor::new()); - let num_threads = num_threads.unwrap_or_else(num_cpus::get); + let num_threads = builder.compute_threads.unwrap_or_else(num_cpus::get); let threads = (0..num_threads) .map(|i| { - let ex = Arc::clone(&executor); - let shutdown_rx = shutdown_rx.clone(); - // miri does not support setting thread names // TODO: change back when https://github.com/rust-lang/miri/issues/1717 is fixed #[cfg(not(miri))] let mut thread_builder = { - let thread_name = if let Some(thread_name) = thread_name { + let thread_name = if let Some(ref thread_name) = builder.thread_name { format!("{} ({})", thread_name, i) } else { format!("TaskPool ({})", i) @@ -139,22 +144,41 @@ impl TaskPool { thread::Builder::new() }; - if let Some(stack_size) = stack_size { + if let Some(stack_size) = builder.stack_size { thread_builder = thread_builder.stack_size(stack_size); } + async fn run_forever(executor: Arc>) { + loop { + while executor.try_tick() {} + future::yield_now().await; + } + } + + let compute = Arc::clone(&compute_executor); + let async_compute = Arc::clone(&compute_executor); + let io = Arc::clone(&io_executor); + let shutdown_rx = shutdown_rx.clone(); + thread_builder .spawn(move || { - let shutdown_future = ex.run(shutdown_rx.recv()); - // Use unwrap_err because we expect a Closed error - future::block_on(shutdown_future).unwrap_err(); + let future = run_forever(compute) + .or(run_forever(async_compute)) + .or(run_forever(io)) + .or(async { + // Use unwrap_err because we expect a Closed error + shutdown_rx.recv().await.unwrap_err(); + }); + future::block_on(future); }) .expect("Failed to spawn thread.") }) .collect(); Self { - executor, + compute_executor, + async_compute_executor, + io_executor, inner: Arc::new(TaskPoolInner { threads, shutdown_tx, @@ -182,12 +206,21 @@ impl TaskPool { // before this function returns. However, rust has no way of knowing // this so we must convert to 'static here to appease the compiler as it is unable to // validate safety. - let executor: &async_executor::Executor = &*self.executor; - let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) }; + let compute_executor: &async_executor::Executor = &*self.compute_executor; + let compute_executor: &'scope async_executor::Executor = + unsafe { mem::transmute(compute_executor) }; + let async_compute_executor: &async_executor::Executor = &*self.async_compute_executor; + let async_compute_executor: &'scope async_executor::Executor = + unsafe { mem::transmute(async_compute_executor) }; + let io_executor: &async_executor::Executor = &*self.io_executor; + let io_executor: &'scope async_executor::Executor = + unsafe { mem::transmute(io_executor) }; let local_executor: &'scope async_executor::LocalExecutor = unsafe { mem::transmute(local_executor) }; let mut scope = Scope { - executor, + compute_executor, + async_compute_executor, + io_executor, local_executor, spawned: Vec::new(), }; @@ -230,7 +263,9 @@ impl TaskPool { break result; }; - self.executor.try_tick(); + self.compute_executor.try_tick(); + self.async_compute_executor.try_tick(); + self.io_executor.try_tick(); local_executor.try_tick(); } } @@ -246,7 +281,24 @@ impl TaskPool { where T: Send + 'static, { - Task::new(self.executor.spawn(future)) + Task::new(self.compute_executor.spawn(future)) + } + + pub fn spawn_async_compute( + &self, + future: impl Future + Send + 'static, + ) -> Task + where + T: Send + 'static, + { + Task::new(self.async_compute_executor.spawn(future)) + } + + pub fn spawn_io(&self, future: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + Task::new(self.io_executor.spawn(future)) } /// Spawns a static future on the thread-local async executor for the current thread. The task @@ -273,7 +325,9 @@ impl Default for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, T> { - executor: &'scope async_executor::Executor<'scope>, + compute_executor: &'scope async_executor::Executor<'scope>, + async_compute_executor: &'scope async_executor::Executor<'scope>, + io_executor: &'scope async_executor::Executor<'scope>, local_executor: &'scope async_executor::LocalExecutor<'scope>, spawned: Vec>, } @@ -288,7 +342,17 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&mut self, f: Fut) { - let task = self.executor.spawn(f); + let task = self.compute_executor.spawn(f); + self.spawned.push(task); + } + + pub fn spawn_async_compute + 'scope + Send>(&mut self, f: Fut) { + let task = self.async_compute_executor.spawn(f); + self.spawned.push(task); + } + + pub fn spawn_io + 'scope + Send>(&mut self, f: Fut) { + let task = self.io_executor.spawn(f); self.spawned.push(task); } diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs deleted file mode 100644 index 923c1a7eb4eab..0000000000000 --- a/crates/bevy_tasks/src/usages.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Definitions for a few common task pools that we want. Generally the determining factor for what -//! kind of work should go in each pool is latency requirements. -//! -//! For CPU-intensive work (tasks that generally spin until completion) we have a standard -//! [`ComputeTaskPool`] and an [`AsyncComputeTaskPool`]. Work that does not need to be completed to -//! present the next frame should go to the [`AsyncComputeTaskPool`] -//! -//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO -//! task pool. The tasks here are expected to complete very quickly. Generally they should just -//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready -//! for consumption. (likely via channels) - -use super::TaskPool; -use std::ops::Deref; - -/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next -/// frame -#[derive(Clone, Debug)] -pub struct ComputeTaskPool(pub TaskPool); - -impl Deref for ComputeTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// A newtype for a task pool for CPU-intensive work that may span across multiple frames -#[derive(Clone, Debug)] -pub struct AsyncComputeTaskPool(pub TaskPool); - -impl Deref for AsyncComputeTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a -/// "woken" state) -#[derive(Clone, Debug)] -pub struct IoTaskPool(pub TaskPool); - -impl Deref for IoTaskPool { - type Target = TaskPool; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} From 3f9786588d55fe14ebce265a3916cf9553bed37c Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 12:53:41 -0700 Subject: [PATCH 02/95] Basic priortization strategy --- crates/bevy_tasks/src/task_pool.rs | 158 +++++++++++++++++++---------- 1 file changed, 106 insertions(+), 52 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index af237c74ad9b9..9ccc11e4894c7 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -122,58 +122,73 @@ impl TaskPool { let async_compute_executor = Arc::new(async_executor::Executor::new()); let io_executor = Arc::new(async_executor::Executor::new()); - let num_threads = builder.compute_threads.unwrap_or_else(num_cpus::get); - - let threads = (0..num_threads) - .map(|i| { - // miri does not support setting thread names - // TODO: change back when https://github.com/rust-lang/miri/issues/1717 is fixed - #[cfg(not(miri))] - let mut thread_builder = { - let thread_name = if let Some(ref thread_name) = builder.thread_name { - format!("{} ({})", thread_name, i) - } else { - format!("TaskPool ({})", i) - }; - thread::Builder::new().name(thread_name) - }; - #[cfg(miri)] - let mut thread_builder = { - let _ = i; - let _ = thread_name; - thread::Builder::new() - }; - - if let Some(stack_size) = builder.stack_size { - thread_builder = thread_builder.stack_size(stack_size); - } - - async fn run_forever(executor: Arc>) { - loop { - while executor.try_tick() {} - future::yield_now().await; - } - } - - let compute = Arc::clone(&compute_executor); - let async_compute = Arc::clone(&compute_executor); - let io = Arc::clone(&io_executor); - let shutdown_rx = shutdown_rx.clone(); - - thread_builder - .spawn(move || { - let future = run_forever(compute) - .or(run_forever(async_compute)) - .or(run_forever(io)) - .or(async { - // Use unwrap_err because we expect a Closed error - shutdown_rx.recv().await.unwrap_err(); - }); - future::block_on(future); - }) - .expect("Failed to spawn thread.") - }) - .collect(); + let compute_threads = builder.compute_threads.unwrap_or(0); + let io_threads = builder.io_threads.unwrap_or(0); + let async_compute_threads = builder.async_compute_threads.unwrap_or_else(num_cpus::get); + + let mut threads = Vec::with_capacity(compute_threads + io_threads + async_compute_threads); + threads.extend((0..compute_threads).map(|i| { + let mut thread_builder = make_thread_builder( + builder.thread_name.as_deref(), + "Compute", + i, + builder.stack_size, + ); + let compute = Arc::clone(&compute_executor); + let shutdown_rx = shutdown_rx.clone(); + + thread_builder + .spawn(move || { + let future = run_forever(compute).or(async { + // Use unwrap_err because we expect a Closed error + shutdown_rx.recv().await.unwrap_err(); + }); + future::block_on(future); + }) + .expect("Failed to spawn thread.") + })); + threads.extend((0..io_threads).map(|i| { + let mut thread_builder = + make_thread_builder(builder.thread_name.as_deref(), "IO", i, builder.stack_size); + let compute = Arc::clone(&compute_executor); + let io = Arc::clone(&io_executor); + let shutdown_rx = shutdown_rx.clone(); + + thread_builder + .spawn(move || { + let future = run_forever(io).or(run_forever(compute)).or(async { + // Use unwrap_err because we expect a Closed error + shutdown_rx.recv().await.unwrap_err(); + }); + future::block_on(future); + }) + .expect("Failed to spawn thread.") + })); + threads.extend((0..compute_threads).map(|i| { + let mut thread_builder = make_thread_builder( + builder.thread_name.as_deref(), + "Aync Compute", + i, + builder.stack_size, + ); + let compute = Arc::clone(&compute_executor); + let async_compute = Arc::clone(&compute_executor); + let io = Arc::clone(&io_executor); + let shutdown_rx = shutdown_rx.clone(); + + thread_builder + .spawn(move || { + let future = run_forever(async_compute) + .or(run_forever(compute)) + .or(run_forever(io)) + .or(async { + // Use unwrap_err because we expect a Closed error + shutdown_rx.recv().await.unwrap_err(); + }); + future::block_on(future); + }) + .expect("Failed to spawn thread.") + })); Self { compute_executor, @@ -368,6 +383,45 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { } } +fn make_thread_builder( + thread_name: Option<&str>, + prefix: &'static str, + idx: usize, + stack_size: Option, +) -> thread::Builder { + // miri does not support setting thread names + // TODO: change back when https://github.com/rust-lang/miri/issues/1717 is fixed + #[cfg(not(miri))] + let mut thread_builder = { + let thread_name = if let Some(ref thread_name) = thread_name { + format!("{} ({}, {})", thread_name, prefix, idx) + } else { + format!("TaskPool ({}, {})", prefix, idx) + }; + thread::Builder::new().name(thread_name) + }; + + #[cfg(miri)] + let mut thread_builder = { + let _ = i; + let _ = thread_name; + thread::Builder::new() + }; + + if let Some(stack_size) = stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + thread_builder +} + +async fn run_forever(executor: Arc>) { + loop { + while executor.try_tick() {} + future::yield_now().await; + } +} + #[cfg(test)] #[allow(clippy::blacklisted_name)] mod tests { From f0a70b97be9ecf24dc3b1ba729bbf5755456cbf4 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 13:36:22 -0700 Subject: [PATCH 03/95] Add missing docs --- .../src/single_threaded_task_pool.rs | 77 +++++++++++++++++++ crates/bevy_tasks/src/task_pool.rs | 66 +++++++++++++--- 2 files changed, 131 insertions(+), 12 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 757b711d999d3..acdfa903fe4a6 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -37,6 +37,20 @@ impl TaskPoolBuilder { /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. In this case - main thread only. +/// +/// # Scheduling Semantics +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Compute. Compute is higher priority than IO, which are both higher priority than async compute. +/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize +/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would +/// otherwise be sitting idle. +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// async compute thread groups, but any IO task will take precedence over any compute task on the IO +/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. +/// +/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered +/// via [`TaskPoolBuilder`] when constructing the pool. #[derive(Debug, Default, Clone)] pub struct TaskPool {} @@ -106,6 +120,47 @@ impl TaskPool { FakeTask } + /// Spawns a static future onto the JS event loop. For now it is returning FakeTask + /// instance with no-op detach method. Returning real Task is possible here, but tricky: + /// future is running on JS event loop, Task is running on async_executor::LocalExecutor + /// so some proxy future is needed. Moreover currently we don't have long-living + /// LocalExecutor here (above `spawn` implementation creates temporary one) + /// But for typical use cases it seems that current implementation should be sufficient: + /// caller can spawn long-running future writing results to some channel / event queue + /// and simply call detach on returned Task (like AssetServer does) - spawned future + /// can write results to some channel / event queue. + pub fn spawn_async_compute( + &self, + future: impl Future + Send + 'static, + ) -> Task + where + T: Send + 'static, + { + wasm_bindgen_futures::spawn_local(async move { + future.await; + }); + FakeTask + } + + /// Spawns a static future onto the JS event loop. For now it is returning FakeTask + /// instance with no-op detach method. Returning real Task is possible here, but tricky: + /// future is running on JS event loop, Task is running on async_executor::LocalExecutor + /// so some proxy future is needed. Moreover currently we don't have long-living + /// LocalExecutor here (above `spawn` implementation creates temporary one) + /// But for typical use cases it seems that current implementation should be sufficient: + /// caller can spawn long-running future writing results to some channel / event queue + /// and simply call detach on returned Task (like AssetServer does) - spawned future + /// can write results to some channel / event queue. + pub fn spawn_io(&self, future: impl Future + Send + 'static) -> Task + where + T: Send + 'static, + { + wasm_bindgen_futures::spawn_local(async move { + future.await; + }); + FakeTask + } + /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskSpool::spawn`]. pub fn spawn_local(&self, future: impl Future + 'static) -> FakeTask where @@ -145,6 +200,28 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawn_local(f); } + /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. + /// + /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_async_compute + 'scope + Send>(&mut self, f: Fut) { + self.spawn_local(f); + } + + /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive + /// the provided future. The results of the future will be returned as a part of + /// [`TaskPool::scope`]'s return value. + /// + /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. + /// + /// For more information, see [`TaskPool::scope`]. + pub fn spawn_io + 'scope + Send>(&mut self, f: Fut) { + self.spawn_local(f); + } + /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9ccc11e4894c7..f79738a372d2b 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -32,18 +32,20 @@ impl TaskPoolBuilder { Self::default() } - /// Override the number of threads created for the pool. If unset, we default to the number + /// Override the number of compute-priority threads created for the pool. If unset, this default to the number /// of logical cores of the system pub fn compute_threads(mut self, num_threads: usize) -> Self { self.compute_threads = Some(num_threads); self } + /// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0. pub fn async_compute_threads(mut self, num_threads: usize) -> Self { self.async_compute_threads = Some(num_threads); self } + /// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0. pub fn io_threads(mut self, num_threads: usize) -> Self { self.io_threads = Some(num_threads); self @@ -90,6 +92,20 @@ impl Drop for TaskPoolInner { /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. +/// +/// # Scheduling Semantics +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Compute. Compute is higher priority than IO, which are both higher priority than async compute. +/// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize +/// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would +/// otherwise be sitting idle. +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// async compute thread groups, but any IO task will take precedence over any compute task on the IO +/// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. +/// +/// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered +/// via [`TaskPoolBuilder`] when constructing the pool. #[derive(Debug, Clone)] pub struct TaskPool { /// The executor for the pool @@ -122,13 +138,13 @@ impl TaskPool { let async_compute_executor = Arc::new(async_executor::Executor::new()); let io_executor = Arc::new(async_executor::Executor::new()); - let compute_threads = builder.compute_threads.unwrap_or(0); + let compute_threads = builder.compute_threads.unwrap_or_else(num_cpus::get); let io_threads = builder.io_threads.unwrap_or(0); - let async_compute_threads = builder.async_compute_threads.unwrap_or_else(num_cpus::get); + let async_compute_threads = builder.async_compute_threads.unwrap_or(0); let mut threads = Vec::with_capacity(compute_threads + io_threads + async_compute_threads); threads.extend((0..compute_threads).map(|i| { - let mut thread_builder = make_thread_builder( + let thread_builder = make_thread_builder( builder.thread_name.as_deref(), "Compute", i, @@ -148,7 +164,7 @@ impl TaskPool { .expect("Failed to spawn thread.") })); threads.extend((0..io_threads).map(|i| { - let mut thread_builder = + let thread_builder = make_thread_builder(builder.thread_name.as_deref(), "IO", i, builder.stack_size); let compute = Arc::clone(&compute_executor); let io = Arc::clone(&io_executor); @@ -165,7 +181,7 @@ impl TaskPool { .expect("Failed to spawn thread.") })); threads.extend((0..compute_threads).map(|i| { - let mut thread_builder = make_thread_builder( + let thread_builder = make_thread_builder( builder.thread_name.as_deref(), "Aync Compute", i, @@ -287,9 +303,9 @@ impl TaskPool { }) } - /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be - /// cancelled and "detached" allowing it to continue running without having to be polled by the - /// end-user. + /// Spawns a static future onto the thread pool with "compute" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. pub fn spawn(&self, future: impl Future + Send + 'static) -> Task @@ -299,6 +315,11 @@ impl TaskPool { Task::new(self.compute_executor.spawn(future)) } + /// Spawns a static future onto the thread pool with "async compute" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// by the end-user. + /// + /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. pub fn spawn_async_compute( &self, future: impl Future + Send + 'static, @@ -309,6 +330,11 @@ impl TaskPool { Task::new(self.async_compute_executor.spawn(future)) } + /// Spawns a static future onto the thread pool with "IO" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// by the end-user. + /// + /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. pub fn spawn_io(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, @@ -348,9 +374,9 @@ pub struct Scope<'scope, T> { } impl<'scope, T: Send + 'scope> Scope<'scope, T> { - /// Spawns a scoped future onto the thread pool. The scope *must* outlive - /// the provided future. The results of the future will be returned as a part of - /// [`TaskPool::scope`]'s return value. + /// Spawns a scoped future onto the thread pool with "compute" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned + /// as a part of [`TaskPool::scope`]'s return value. /// /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used /// instead. @@ -361,11 +387,27 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawned.push(task); } + /// Spawns a scoped future onto the thread pool with "async compute" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned as a + /// part of [`TaskPool::scope`]'s return value. + /// + /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used + /// instead. + /// + /// For more information, see [`TaskPool::scope`]. pub fn spawn_async_compute + 'scope + Send>(&mut self, f: Fut) { let task = self.async_compute_executor.spawn(f); self.spawned.push(task); } + /// Spawns a scoped future onto the thread pool with "IO" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned as a + /// part of [`TaskPool::scope`]'s return value. + /// + /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used + /// instead. + /// + /// For more information, see [`TaskPool::scope`]. pub fn spawn_io + 'scope + Send>(&mut self, f: Fut) { let task = self.io_executor.spawn(f); self.spawned.push(task); From dadfe150b2cb20a998c695c5e1d55f16f8940fbf Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 13:41:48 -0700 Subject: [PATCH 04/95] Formatting --- .../src/single_threaded_task_pool.rs | 10 +++--- crates/bevy_tasks/src/task_pool.rs | 34 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index acdfa903fe4a6..30dd16841e45f 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -37,18 +37,18 @@ impl TaskPoolBuilder { /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. In this case - main thread only. -/// +/// /// # Scheduling Semantics -/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async /// Compute. Compute is higher priority than IO, which are both higher priority than async compute. /// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize /// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would /// otherwise be sitting idle. -/// -/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and /// async compute thread groups, but any IO task will take precedence over any compute task on the IO /// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. -/// +/// /// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered /// via [`TaskPoolBuilder`] when constructing the pool. #[derive(Debug, Default, Clone)] diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index f79738a372d2b..683b359c2b8b9 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -92,18 +92,18 @@ impl Drop for TaskPoolInner { /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. -/// +/// /// # Scheduling Semantics -/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async +/// Each thread in the pool is assigned to one of three priority groups: Compute, IO, and Async /// Compute. Compute is higher priority than IO, which are both higher priority than async compute. /// Every task is assigned to a group upon being spawned. A lower priority thread will always prioritize /// its specific tasks (i.e. IO tasks on a IO thread), but will run higher priority tasks if it would /// otherwise be sitting idle. -/// -/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and +/// +/// For example, under heavy compute workloads, compute tasks will be scheduled to run on the IO and /// async compute thread groups, but any IO task will take precedence over any compute task on the IO /// threads. Likewise, async compute tasks will never be scheduled on a compute or IO thread. -/// +/// /// By default, all threads in the pool are dedicated to compute group. Thread counts can be altered /// via [`TaskPoolBuilder`] when constructing the pool. #[derive(Debug, Clone)] @@ -303,8 +303,8 @@ impl TaskPool { }) } - /// Spawns a static future onto the thread pool with "compute" priority. The returned Task is a future. - /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// Spawns a static future onto the thread pool with "compute" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. @@ -315,8 +315,8 @@ impl TaskPool { Task::new(self.compute_executor.spawn(future)) } - /// Spawns a static future onto the thread pool with "async compute" priority. The returned Task is a future. - /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// Spawns a static future onto the thread pool with "async compute" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. @@ -330,8 +330,8 @@ impl TaskPool { Task::new(self.async_compute_executor.spawn(future)) } - /// Spawns a static future onto the thread pool with "IO" priority. The returned Task is a future. - /// It can also be cancelled and "detached" allowing it to continue running without having to be polled + /// Spawns a static future onto the thread pool with "IO" priority. The returned Task is a future. + /// It can also be cancelled and "detached" allowing it to continue running without having to be polled /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. @@ -374,8 +374,8 @@ pub struct Scope<'scope, T> { } impl<'scope, T: Send + 'scope> Scope<'scope, T> { - /// Spawns a scoped future onto the thread pool with "compute" priority. The scope - /// *must* outlive the provided future. The results of the future will be returned + /// Spawns a scoped future onto the thread pool with "compute" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned /// as a part of [`TaskPool::scope`]'s return value. /// /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used @@ -387,8 +387,8 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawned.push(task); } - /// Spawns a scoped future onto the thread pool with "async compute" priority. The scope - /// *must* outlive the provided future. The results of the future will be returned as a + /// Spawns a scoped future onto the thread pool with "async compute" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned as a /// part of [`TaskPool::scope`]'s return value. /// /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used @@ -400,8 +400,8 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawned.push(task); } - /// Spawns a scoped future onto the thread pool with "IO" priority. The scope - /// *must* outlive the provided future. The results of the future will be returned as a + /// Spawns a scoped future onto the thread pool with "IO" priority. The scope + /// *must* outlive the provided future. The results of the future will be returned as a /// part of [`TaskPool::scope`]'s return value. /// /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used From 3f260b25d871f0d5b1517df2d5694c506d8b6bf5 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 13:45:07 -0700 Subject: [PATCH 05/95] Fix WASM --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 30dd16841e45f..7577fad6cbf4e 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -132,7 +132,7 @@ impl TaskPool { pub fn spawn_async_compute( &self, future: impl Future + Send + 'static, - ) -> Task + ) -> FakeTask where T: Send + 'static, { @@ -151,7 +151,7 @@ impl TaskPool { /// caller can spawn long-running future writing results to some channel / event queue /// and simply call detach on returned Task (like AssetServer does) - spawned future /// can write results to some channel / event queue. - pub fn spawn_io(&self, future: impl Future + Send + 'static) -> Task + pub fn spawn_io(&self, future: impl Future + Send + 'static) -> FakeTask where T: Send + 'static, { From e35e11a6cfc3cf936843e4cd6224813da122750b Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 13:53:50 -0700 Subject: [PATCH 06/95] Fix local tests for bevy_task --- crates/bevy_tasks/examples/busy_behavior.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 8a74034e0ca90..8cd7c0ad5f0ec 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,13 +1,13 @@ use bevy_tasks::TaskPoolBuilder; -// This sample demonstrates creating a thread pool with 4 tasks and spawning 40 tasks that spin -// for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical +// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that +// spin for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical // cores) fn main() { let pool = TaskPoolBuilder::new() .thread_name("Busy Behavior ThreadPool".to_string()) - .num_threads(4) + .compute_threads(4) .build(); let t0 = instant::Instant::now(); From 2eab8fcd1d0d5235e01074ff8a431d05b8354b0f Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 14:17:59 -0700 Subject: [PATCH 07/95] Fix CI --- benches/benches/bevy_tasks/iter.rs | 6 +++--- crates/bevy_asset/src/debug_asset_server.rs | 8 ++++---- crates/bevy_ecs/src/schedule/executor_parallel.rs | 4 +--- crates/bevy_tasks/examples/busy_behavior.rs | 2 +- examples/asset/custom_asset_io.rs | 2 +- examples/async_tasks/async_compute.rs | 12 ++++++------ examples/ecs/parallel_query.rs | 4 ++-- 7 files changed, 18 insertions(+), 20 deletions(-) diff --git a/benches/benches/bevy_tasks/iter.rs b/benches/benches/bevy_tasks/iter.rs index 74b043f9a234e..3429deaa5b96a 100644 --- a/benches/benches/bevy_tasks/iter.rs +++ b/benches/benches/bevy_tasks/iter.rs @@ -34,7 +34,7 @@ fn bench_overhead(c: &mut Criterion) { let mut v = (0..10000).collect::>(); let mut group = c.benchmark_group("overhead_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, @@ -69,7 +69,7 @@ fn bench_for_each(c: &mut Criterion) { let mut v = (0..10000).collect::>(); let mut group = c.benchmark_group("for_each_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, @@ -115,7 +115,7 @@ fn bench_many_maps(c: &mut Criterion) { let v = (0..10000).collect::>(); let mut group = c.benchmark_group("many_maps_par_iter"); for thread_count in &[1, 2, 4, 8, 16, 32] { - let pool = TaskPoolBuilder::new().num_threads(*thread_count).build(); + let pool = TaskPoolBuilder::new().compute_threads(*thread_count).build(); group.bench_with_input( BenchmarkId::new("threads", thread_count), thread_count, diff --git a/crates/bevy_asset/src/debug_asset_server.rs b/crates/bevy_asset/src/debug_asset_server.rs index d4970dabcc071..76b47ef83f124 100644 --- a/crates/bevy_asset/src/debug_asset_server.rs +++ b/crates/bevy_asset/src/debug_asset_server.rs @@ -4,7 +4,7 @@ use bevy_ecs::{ schedule::SystemLabel, system::{NonSendMut, Res, ResMut, SystemState}, }; -use bevy_tasks::{IoTaskPool, TaskPoolBuilder}; +use bevy_tasks::TaskPoolBuilder; use bevy_utils::HashMap; use std::{ ops::{Deref, DerefMut}, @@ -60,12 +60,12 @@ impl Plugin for DebugAssetServerPlugin { fn build(&self, app: &mut bevy_app::App) { let mut debug_asset_app = App::new(); debug_asset_app - .insert_resource(IoTaskPool( + .insert_resource( TaskPoolBuilder::default() - .num_threads(2) + .io_threads(2) .thread_name("Debug Asset Server IO Task Pool".to_string()) .build(), - )) + ) .insert_resource(AssetServerSettings { asset_folder: "crates".to_string(), watch_for_changes: true, diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 60e769a589855..91ed3de6a9477 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -123,9 +123,7 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - let task_pool = world - .get_resource_or_insert_with(|| TaskPool::default()) - .clone(); + let task_pool = world.get_resource_or_insert_with(TaskPool::default).clone(); task_pool.scope(|scope| { self.prepare_systems(scope, systems, world); let parallel_executor = async { diff --git a/crates/bevy_tasks/examples/busy_behavior.rs b/crates/bevy_tasks/examples/busy_behavior.rs index 8cd7c0ad5f0ec..e220175c1e120 100644 --- a/crates/bevy_tasks/examples/busy_behavior.rs +++ b/crates/bevy_tasks/examples/busy_behavior.rs @@ -1,6 +1,6 @@ use bevy_tasks::TaskPoolBuilder; -// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that +// This sample demonstrates creating a thread pool with 4 compute threads and spawning 40 tasks that // spin for 100ms. It's expected to take about a second to run (assuming the machine has >= 4 logical // cores) diff --git a/examples/asset/custom_asset_io.rs b/examples/asset/custom_asset_io.rs index af74a93d2ba3e..10bee5cf510a5 100644 --- a/examples/asset/custom_asset_io.rs +++ b/examples/asset/custom_asset_io.rs @@ -49,7 +49,7 @@ impl Plugin for CustomAssetIoPlugin { fn build(&self, app: &mut App) { // must get a hold of the task pool in order to create the asset server - let task_pool = app.world.resource::().0.clone(); + let task_pool = app.world.resource::().clone(); let asset_io = { // the platform default asset io requires a reference to the app diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index 9182ac48797fd..43b8ab9a748ea 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -1,13 +1,13 @@ use bevy::{ prelude::*, - tasks::{AsyncComputeTaskPool, Task}, + tasks::{Task, TaskPool}, }; use futures_lite::future; use rand::Rng; use std::time::{Duration, Instant}; -/// This example shows how to use the ECS and the [`AsyncComputeTaskPool`] -/// to spawn, poll, and complete tasks across systems and system ticks. +/// This example shows how to use the ECS and the [`TaskPool`] to +/// spawn, poll, and complete tasks across systems and system ticks. fn main() { App::new() .insert_resource(Msaa { samples: 4 }) @@ -50,12 +50,12 @@ struct ComputeTransform(Task); /// work that potentially spans multiple frames/ticks. A separate /// system, `handle_tasks`, will poll the spawned tasks on subsequent /// frames/ticks, and use the results to spawn cubes -fn spawn_tasks(mut commands: Commands, thread_pool: Res) { +fn spawn_tasks(mut commands: Commands, thread_pool: Res) { for x in 0..NUM_CUBES { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { - // Spawn new task on the AsyncComputeTaskPool - let task = thread_pool.spawn(async move { + // Spawn new task on the TaskPool + let task = thread_pool.spawn_async_compute(async move { let mut rng = rand::thread_rng(); let start_time = Instant::now(); let duration = Duration::from_secs_f32(rng.gen_range(0.05..0.2)); diff --git a/examples/ecs/parallel_query.rs b/examples/ecs/parallel_query.rs index a70eecf6d8b1a..d58fa1e046d4f 100644 --- a/examples/ecs/parallel_query.rs +++ b/examples/ecs/parallel_query.rs @@ -21,7 +21,7 @@ fn spawn_system(mut commands: Commands, asset_server: Res) { } // Move sprites according to their velocity -fn move_system(pool: Res, mut sprites: Query<(&mut Transform, &Velocity)>) { +fn move_system(pool: Res, mut sprites: Query<(&mut Transform, &Velocity)>) { // Compute the new location of each sprite in parallel on the // ComputeTaskPool using batches of 32 sprites // @@ -37,7 +37,7 @@ fn move_system(pool: Res, mut sprites: Query<(&mut Transform, & // Bounce sprites outside the window fn bounce_system( - pool: Res, + pool: Res, windows: Res, mut sprites: Query<(&Transform, &mut Velocity)>, ) { From 02a36b5e28ea2fa1954685a9eb4927c40fc1d8f3 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 14:55:14 -0700 Subject: [PATCH 08/95] Fix wasm32 --- crates/bevy_tasks/src/lib.rs | 2 ++ crates/bevy_tasks/src/task_pool.rs | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1967f150f1ace..f8fe69f0e977c 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -28,6 +28,8 @@ pub mod prelude { slice::{ParallelSlice, ParallelSliceMut}, task_pool::TaskPool, }; + #[cfg(target_arch = "wasm32")] + pub use single_threaded_task_pool::TaskPool; } pub use num_cpus::get as logical_core_count; diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 683b359c2b8b9..aa0f5c3ce15af 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -180,7 +180,7 @@ impl TaskPool { }) .expect("Failed to spawn thread.") })); - threads.extend((0..compute_threads).map(|i| { + threads.extend((0..async_compute_threads).map(|i| { let thread_builder = make_thread_builder( builder.thread_name.as_deref(), "Aync Compute", From 76a2da0626b55566ab69dd9d5aae4da71435148f Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 14:55:52 -0700 Subject: [PATCH 09/95] Fix miri --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index aa0f5c3ce15af..9e91c60111f7f 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -445,7 +445,7 @@ fn make_thread_builder( #[cfg(miri)] let mut thread_builder = { - let _ = i; + let _ = idx; let _ = thread_name; thread::Builder::new() }; From 2a0ae14d35c4dec89e4edcc8d744bf24d9dfebb9 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 15:01:46 -0700 Subject: [PATCH 10/95] More wasm32 tomfoolery --- crates/bevy_tasks/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index f8fe69f0e977c..9ced72324b88b 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -26,8 +26,9 @@ pub mod prelude { pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, - task_pool::TaskPool, }; + #[cfg(not(target_arch = "wasm32"))] + pub use task_pool::TaskPool; #[cfg(target_arch = "wasm32")] pub use single_threaded_task_pool::TaskPool; } From c32bea7e597a67b0d5cd1cc0d4d90ea14e23f944 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 15:03:45 -0700 Subject: [PATCH 11/95] Derp --- crates/bevy_tasks/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 9ced72324b88b..9b9802dae7b76 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -28,9 +28,9 @@ pub mod prelude { slice::{ParallelSlice, ParallelSliceMut}, }; #[cfg(not(target_arch = "wasm32"))] - pub use task_pool::TaskPool; + pub use crate::task_pool::TaskPool; #[cfg(target_arch = "wasm32")] - pub use single_threaded_task_pool::TaskPool; + pub use crate::single_threaded_task_pool::TaskPool; } pub use num_cpus::get as logical_core_count; From 79116178b6b0b83973578c3f6f32ca2035e6d993 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 15:10:20 -0700 Subject: [PATCH 12/95] Formatting woes --- crates/bevy_tasks/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 9b9802dae7b76..fe8a1c06d4ece 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -22,15 +22,15 @@ pub use iter::ParallelIterator; #[allow(missing_docs)] pub mod prelude { + #[cfg(target_arch = "wasm32")] + pub use crate::single_threaded_task_pool::TaskPool; + #[cfg(not(target_arch = "wasm32"))] + pub use crate::task_pool::TaskPool; #[doc(hidden)] pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, }; - #[cfg(not(target_arch = "wasm32"))] - pub use crate::task_pool::TaskPool; - #[cfg(target_arch = "wasm32")] - pub use crate::single_threaded_task_pool::TaskPool; } pub use num_cpus::get as logical_core_count; From c3e4cff23d06773b8f95e2296e1ac110c2e3bfe2 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 15:14:50 -0700 Subject: [PATCH 13/95] Fix up TaskPoolBuilder for wasm32 --- .../bevy_tasks/src/single_threaded_task_pool.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 7577fad6cbf4e..7cc6f88c75bed 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -14,8 +14,19 @@ impl TaskPoolBuilder { Self::default() } - /// No op on the single threaded task pool - pub fn num_threads(self, _num_threads: usize) -> Self { + /// Override the number of compute-priority threads created for the pool. If unset, this default to the number + /// of logical cores of the system + pub fn compute_threads(mut self, num_threads: usize) -> Self { + self + } + + /// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0. + pub fn async_compute_threads(mut self, num_threads: usize) -> Self { + self + } + + /// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0. + pub fn io_threads(mut self, num_threads: usize) -> Self { self } From 0b67525a1f7e380b7fa02a2a92ec4237988e4fcd Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 15:36:55 -0700 Subject: [PATCH 14/95] Remove unnecessary Send bounds --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 7cc6f88c75bed..b1df873df4177 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -207,7 +207,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn + 'scope>(&mut self, f: Fut) { self.spawn_local(f); } @@ -218,7 +218,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_async_compute + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn_async_compute + 'scope>(&mut self, f: Fut) { self.spawn_local(f); } @@ -229,7 +229,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_io + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn_io + 'scope>(&mut self, f: Fut) { self.spawn_local(f); } From e87790e83c32d9ddd06814d3f680f8d55d845c9d Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 16:07:50 -0700 Subject: [PATCH 15/95] More wasm woes --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index b1df873df4177..162ddc7ecdc11 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -16,17 +16,17 @@ impl TaskPoolBuilder { /// Override the number of compute-priority threads created for the pool. If unset, this default to the number /// of logical cores of the system - pub fn compute_threads(mut self, num_threads: usize) -> Self { + pub fn compute_threads(self, num_threads: usize) -> Self { self } /// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0. - pub fn async_compute_threads(mut self, num_threads: usize) -> Self { + pub fn async_compute_threads(self, num_threads: usize) -> Self { self } /// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0. - pub fn io_threads(mut self, num_threads: usize) -> Self { + pub fn io_threads(self, num_threads: usize) -> Self { self } @@ -142,7 +142,7 @@ impl TaskPool { /// can write results to some channel / event queue. pub fn spawn_async_compute( &self, - future: impl Future + Send + 'static, + future: impl Future + 'static, ) -> FakeTask where T: Send + 'static, @@ -162,7 +162,7 @@ impl TaskPool { /// caller can spawn long-running future writing results to some channel / event queue /// and simply call detach on returned Task (like AssetServer does) - spawned future /// can write results to some channel / event queue. - pub fn spawn_io(&self, future: impl Future + Send + 'static) -> FakeTask + pub fn spawn_io(&self, future: impl Future + 'static) -> FakeTask where T: Send + 'static, { From 8c722e3d09b66caceda051a7c6b86c96ba401f35 Mon Sep 17 00:00:00 2001 From: james7132 Date: Fri, 13 May 2022 16:13:10 -0700 Subject: [PATCH 16/95] Formatting --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 162ddc7ecdc11..7d5c49ace9ca0 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -140,10 +140,7 @@ impl TaskPool { /// caller can spawn long-running future writing results to some channel / event queue /// and simply call detach on returned Task (like AssetServer does) - spawned future /// can write results to some channel / event queue. - pub fn spawn_async_compute( - &self, - future: impl Future + 'static, - ) -> FakeTask + pub fn spawn_async_compute(&self, future: impl Future + 'static) -> FakeTask where T: Send + 'static, { From 6ad621ef71312021cc9d0dfb78a4dd528fd94bdb Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 14 May 2022 13:21:33 -0700 Subject: [PATCH 17/95] Use run instead custom run_forever --- crates/bevy_tasks/src/task_pool.rs | 41 ++++++++++++------------------ 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9e91c60111f7f..bd3e89c2d2a27 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -155,11 +155,9 @@ impl TaskPool { thread_builder .spawn(move || { - let future = run_forever(compute).or(async { - // Use unwrap_err because we expect a Closed error - shutdown_rx.recv().await.unwrap_err(); - }); - future::block_on(future); + let future = compute.run(shutdown_rx.recv()).or(shutdown_rx.recv()); + // Use unwrap_err because we expect a Closed error + future::block_on(future).unwrap_err(); }) .expect("Failed to spawn thread.") })); @@ -172,11 +170,12 @@ impl TaskPool { thread_builder .spawn(move || { - let future = run_forever(io).or(run_forever(compute)).or(async { - // Use unwrap_err because we expect a Closed error - shutdown_rx.recv().await.unwrap_err(); - }); - future::block_on(future); + let future = io + .run(shutdown_rx.recv()) + .or(compute.run(shutdown_rx.recv())) + .or(shutdown_rx.recv()); + // Use unwrap_err because we expect a Closed error + future::block_on(future).unwrap_err(); }) .expect("Failed to spawn thread.") })); @@ -194,14 +193,13 @@ impl TaskPool { thread_builder .spawn(move || { - let future = run_forever(async_compute) - .or(run_forever(compute)) - .or(run_forever(io)) - .or(async { - // Use unwrap_err because we expect a Closed error - shutdown_rx.recv().await.unwrap_err(); - }); - future::block_on(future); + let future = async_compute + .run(shutdown_rx.recv()) + .or(compute.run(shutdown_rx.recv())) + .or(io.run(shutdown_rx.recv())) + .or(shutdown_rx.recv()); + // Use unwrap_err because we expect a Closed error + future::block_on(future).unwrap_err(); }) .expect("Failed to spawn thread.") })); @@ -457,13 +455,6 @@ fn make_thread_builder( thread_builder } -async fn run_forever(executor: Arc>) { - loop { - while executor.try_tick() {} - future::yield_now().await; - } -} - #[cfg(test)] #[allow(clippy::blacklisted_name)] mod tests { From 92e8bdfa52615e12a12e97dee2a7e5e234cdacd9 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sat, 14 May 2022 23:13:34 -0700 Subject: [PATCH 18/95] Remove redundant "or" call --- crates/bevy_tasks/src/task_pool.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index bd3e89c2d2a27..05a3232b7beca 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -155,9 +155,8 @@ impl TaskPool { thread_builder .spawn(move || { - let future = compute.run(shutdown_rx.recv()).or(shutdown_rx.recv()); // Use unwrap_err because we expect a Closed error - future::block_on(future).unwrap_err(); + future::block_on(compute.run(shutdown_rx.recv())).unwrap_err(); }) .expect("Failed to spawn thread.") })); From 36dd17744dbc429f45ec10fa94c38e13c0832f4e Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 00:14:07 -0700 Subject: [PATCH 19/95] Use TaskGroup to specify target group instead --- crates/bevy_asset/src/asset_server.rs | 4 +- crates/bevy_tasks/src/lib.rs | 3 +- crates/bevy_tasks/src/task.rs | 19 +++++ crates/bevy_tasks/src/task_pool.rs | 110 +++++++++++--------------- examples/async_tasks/async_compute.rs | 4 +- 5 files changed, 70 insertions(+), 70 deletions(-) diff --git a/crates/bevy_asset/src/asset_server.rs b/crates/bevy_asset/src/asset_server.rs index 4efd569b6e450..ce49ca52764d6 100644 --- a/crates/bevy_asset/src/asset_server.rs +++ b/crates/bevy_asset/src/asset_server.rs @@ -7,7 +7,7 @@ use crate::{ use anyhow::Result; use bevy_ecs::system::{Res, ResMut}; use bevy_log::warn; -use bevy_tasks::TaskPool; +use bevy_tasks::{TaskGroup, TaskPool}; use bevy_utils::{Entry, HashMap, Uuid}; use crossbeam_channel::TryRecvError; use parking_lot::{Mutex, RwLock}; @@ -379,7 +379,7 @@ impl AssetServer { let owned_path = asset_path.to_owned(); self.server .task_pool - .spawn_io(async move { + .spawn_as(TaskGroup::IO, async move { if let Err(err) = server.load_async(owned_path, force).await { warn!("{}", err); } diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index fe8a1c06d4ece..31666512d4362 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -5,7 +5,7 @@ mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; -pub use task::Task; +pub use task::{Task, TaskGroup}; #[cfg(not(target_arch = "wasm32"))] mod task_pool; @@ -30,6 +30,7 @@ pub mod prelude { pub use crate::{ iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, + TaskGroup, }; } diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index eb257080b4207..156efadfdc50c 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -4,6 +4,25 @@ use std::{ task::{Context, Poll}, }; +/// A group a task is assigned to upon being spawned. +/// +/// By default, `Compute` is used for [`TaskPool::spawn`]. +/// +/// [`TaskPool::spawn`]: crate::TaskPool::spawn +#[derive(Clone, Copy, Debug)] +pub enum TaskGroup { + /// CPU-bound, short-lived tasks. Usually used for tasks + /// within the scope of a single frame of a game. + Compute, + /// IO-bound, potentially long lasting tasks that readily + /// yield. Usually used for loading assets or doing network + /// communication. + IO, + /// CPU-bound, long-lived takss. Usually used for tasks + /// that last longer than a single frame. + AsyncCompute, +} + /// Wraps `async_executor::Task`, a spawned future. /// /// Tasks are also futures themselves and yield the output of the spawned future. diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 05a3232b7beca..d428d9337a7a2 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -8,7 +8,7 @@ use std::{ use futures_lite::{future, pin, FutureExt}; -use crate::Task; +use crate::{Task, TaskGroup}; /// Used to create a [`TaskPool`] #[derive(Debug, Default, Clone)] @@ -229,26 +229,42 @@ impl TaskPool { F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { + self.scope_as(TaskGroup::Compute, f) + } + + /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope_as<'scope, F, T>(&self, group: TaskGroup, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + // SAFETY: This function blocks until all futures complete, so this future must return + // before this function returns. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let executor: &'scope async_executor::Executor = match group { + TaskGroup::Compute => { + let executor: &async_executor::Executor = &*self.compute_executor; + unsafe { mem::transmute(executor) } + } + TaskGroup::AsyncCompute => { + let executor: &async_executor::Executor = &*self.async_compute_executor; + unsafe { mem::transmute(executor) } + } + TaskGroup::IO => { + let executor: &async_executor::Executor = &*self.io_executor; + unsafe { mem::transmute(executor) } + } + }; TaskPool::LOCAL_EXECUTOR.with(|local_executor| { - // SAFETY: This function blocks until all futures complete, so this future must return - // before this function returns. However, rust has no way of knowing - // this so we must convert to 'static here to appease the compiler as it is unable to - // validate safety. - let compute_executor: &async_executor::Executor = &*self.compute_executor; - let compute_executor: &'scope async_executor::Executor = - unsafe { mem::transmute(compute_executor) }; - let async_compute_executor: &async_executor::Executor = &*self.async_compute_executor; - let async_compute_executor: &'scope async_executor::Executor = - unsafe { mem::transmute(async_compute_executor) }; - let io_executor: &async_executor::Executor = &*self.io_executor; - let io_executor: &'scope async_executor::Executor = - unsafe { mem::transmute(io_executor) }; let local_executor: &'scope async_executor::LocalExecutor = unsafe { mem::transmute(local_executor) }; let mut scope = Scope { - compute_executor, - async_compute_executor, - io_executor, + executor: <&'scope async_executor::Executor>::clone(&executor), local_executor, spawned: Vec::new(), }; @@ -291,9 +307,7 @@ impl TaskPool { break result; }; - self.compute_executor.try_tick(); - self.async_compute_executor.try_tick(); - self.io_executor.try_tick(); + executor.try_tick(); local_executor.try_tick(); } } @@ -309,7 +323,7 @@ impl TaskPool { where T: Send + 'static, { - Task::new(self.compute_executor.spawn(future)) + self.spawn_as(TaskGroup::Compute, future) } /// Spawns a static future onto the thread pool with "async compute" priority. The returned Task is a future. @@ -317,26 +331,20 @@ impl TaskPool { /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn_async_compute( + #[inline] + pub fn spawn_as( &self, + group: TaskGroup, future: impl Future + Send + 'static, ) -> Task where T: Send + 'static, { - Task::new(self.async_compute_executor.spawn(future)) - } - - /// Spawns a static future onto the thread pool with "IO" priority. The returned Task is a future. - /// It can also be cancelled and "detached" allowing it to continue running without having to be polled - /// by the end-user. - /// - /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn_io(&self, future: impl Future + Send + 'static) -> Task - where - T: Send + 'static, - { - Task::new(self.io_executor.spawn(future)) + Task::new(match group { + TaskGroup::Compute => self.compute_executor.spawn(future), + TaskGroup::AsyncCompute => self.async_compute_executor.spawn(future), + TaskGroup::IO => self.io_executor.spawn(future), + }) } /// Spawns a static future on the thread-local async executor for the current thread. The task @@ -363,9 +371,7 @@ impl Default for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, T> { - compute_executor: &'scope async_executor::Executor<'scope>, - async_compute_executor: &'scope async_executor::Executor<'scope>, - io_executor: &'scope async_executor::Executor<'scope>, + executor: &'scope async_executor::Executor<'scope>, local_executor: &'scope async_executor::LocalExecutor<'scope>, spawned: Vec>, } @@ -380,33 +386,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&mut self, f: Fut) { - let task = self.compute_executor.spawn(f); - self.spawned.push(task); - } - - /// Spawns a scoped future onto the thread pool with "async compute" priority. The scope - /// *must* outlive the provided future. The results of the future will be returned as a - /// part of [`TaskPool::scope`]'s return value. - /// - /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used - /// instead. - /// - /// For more information, see [`TaskPool::scope`]. - pub fn spawn_async_compute + 'scope + Send>(&mut self, f: Fut) { - let task = self.async_compute_executor.spawn(f); - self.spawned.push(task); - } - - /// Spawns a scoped future onto the thread pool with "IO" priority. The scope - /// *must* outlive the provided future. The results of the future will be returned as a - /// part of [`TaskPool::scope`]'s return value. - /// - /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used - /// instead. - /// - /// For more information, see [`TaskPool::scope`]. - pub fn spawn_io + 'scope + Send>(&mut self, f: Fut) { - let task = self.io_executor.spawn(f); + let task = self.executor.spawn(f); self.spawned.push(task); } diff --git a/examples/async_tasks/async_compute.rs b/examples/async_tasks/async_compute.rs index 43b8ab9a748ea..a80e8bbf1c9c7 100644 --- a/examples/async_tasks/async_compute.rs +++ b/examples/async_tasks/async_compute.rs @@ -1,6 +1,6 @@ use bevy::{ prelude::*, - tasks::{Task, TaskPool}, + tasks::{Task, TaskGroup, TaskPool}, }; use futures_lite::future; use rand::Rng; @@ -55,7 +55,7 @@ fn spawn_tasks(mut commands: Commands, thread_pool: Res) { for y in 0..NUM_CUBES { for z in 0..NUM_CUBES { // Spawn new task on the TaskPool - let task = thread_pool.spawn_async_compute(async move { + let task = thread_pool.spawn_as(TaskGroup::AsyncCompute, async move { let mut rng = rand::thread_rng(); let start_time = Instant::now(); let duration = Duration::from_secs_f32(rng.gen_range(0.05..0.2)); From d8f90167bd682abeb469394e49d59637ff8a2e8f Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 00:37:00 -0700 Subject: [PATCH 20/95] Fix WASM --- .../src/single_threaded_task_pool.rs | 46 +++++++++---------- crates/bevy_tasks/src/task_pool.rs | 20 +++++--- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 7d5c49ace9ca0..24a6c71fc011a 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,3 +1,4 @@ +use crate::TaskGroup; use std::{ future::Future, mem, @@ -87,6 +88,20 @@ impl TaskPool { /// /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + self.scope_as(TaskGroup::Compute, f) + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + #[inline] + pub fn scope_as<'scope, F, T>(&self, _: TaskGroup, f: F) -> Vec where F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, @@ -125,29 +140,7 @@ impl TaskPool { where T: 'static, { - wasm_bindgen_futures::spawn_local(async move { - future.await; - }); - FakeTask - } - - /// Spawns a static future onto the JS event loop. For now it is returning FakeTask - /// instance with no-op detach method. Returning real Task is possible here, but tricky: - /// future is running on JS event loop, Task is running on async_executor::LocalExecutor - /// so some proxy future is needed. Moreover currently we don't have long-living - /// LocalExecutor here (above `spawn` implementation creates temporary one) - /// But for typical use cases it seems that current implementation should be sufficient: - /// caller can spawn long-running future writing results to some channel / event queue - /// and simply call detach on returned Task (like AssetServer does) - spawned future - /// can write results to some channel / event queue. - pub fn spawn_async_compute(&self, future: impl Future + 'static) -> FakeTask - where - T: Send + 'static, - { - wasm_bindgen_futures::spawn_local(async move { - future.await; - }); - FakeTask + self.spawn_as(TaskGroup::Compute, future) } /// Spawns a static future onto the JS event loop. For now it is returning FakeTask @@ -159,7 +152,12 @@ impl TaskPool { /// caller can spawn long-running future writing results to some channel / event queue /// and simply call detach on returned Task (like AssetServer does) - spawned future /// can write results to some channel / event queue. - pub fn spawn_io(&self, future: impl Future + 'static) -> FakeTask + #[inline] + pub fn spawn_as( + &self, + group: TaskGroup, + future: impl Future + 'static, + ) -> FakeTask where T: Send + 'static, { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index d428d9337a7a2..583e1ded6cfdf 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -219,11 +219,14 @@ impl TaskPool { self.inner.threads.len() } - /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, - /// passing a scope object into it. The scope object provided to the callback can be used - /// to spawn tasks. This function will await the completion of all tasks before returning. + /// Allows spawning non-`'static` futures on the thread pool under the [`Compute`] task group. + /// The function takes a callback, passing a scope object into it. The scope object provided to + /// the callback can be used to spawn tasks. This function will await the completion of all + /// tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` + /// + /// [`Compute`]: crate::TaskGroup::Compute pub fn scope<'scope, F, T>(&self, f: F) -> Vec where F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, @@ -232,9 +235,10 @@ impl TaskPool { self.scope_as(TaskGroup::Compute, f) } - /// Allows spawning non-`'static` futures on the thread pool. The function takes a callback, - /// passing a scope object into it. The scope object provided to the callback can be used - /// to spawn tasks. This function will await the completion of all tasks before returning. + /// Allows spawning non-`'static` futures on the thread pool in a specific task group. The + /// function takes a callback, passing a scope object into it. The scope object provided + /// to the callback can be used to spawn tasks. This function will await the completion of + /// all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope_as<'scope, F, T>(&self, group: TaskGroup, f: F) -> Vec @@ -314,11 +318,13 @@ impl TaskPool { }) } - /// Spawns a static future onto the thread pool with "compute" priority. The returned Task is a future. + /// Spawns a static future onto the thread pool in the [`Compute`] group. The returned Task is a future. /// It can also be cancelled and "detached" allowing it to continue running without having to be polled /// by the end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. + /// + /// [`Compute`]: crate::TaskGroup::Compute pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where T: Send + 'static, From 5d9f71de3efac787704a3642bad017ae6a6787f6 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 00:39:57 -0700 Subject: [PATCH 21/95] Relax WASM bounds --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 24a6c71fc011a..2629a793744d2 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -159,7 +159,7 @@ impl TaskPool { future: impl Future + 'static, ) -> FakeTask where - T: Send + 'static, + T: 'static, { wasm_bindgen_futures::spawn_local(async move { future.await; From ce9c9c7df59c19d96734a095937f44c011140950 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 15:50:05 -0700 Subject: [PATCH 22/95] Remove more extra shutdown checks --- crates/bevy_tasks/src/task_pool.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 583e1ded6cfdf..d92ccbab800f1 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -171,8 +171,7 @@ impl TaskPool { .spawn(move || { let future = io .run(shutdown_rx.recv()) - .or(compute.run(shutdown_rx.recv())) - .or(shutdown_rx.recv()); + .or(compute.run(shutdown_rx.recv())); // Use unwrap_err because we expect a Closed error future::block_on(future).unwrap_err(); }) @@ -195,8 +194,7 @@ impl TaskPool { let future = async_compute .run(shutdown_rx.recv()) .or(compute.run(shutdown_rx.recv())) - .or(io.run(shutdown_rx.recv())) - .or(shutdown_rx.recv()); + .or(io.run(shutdown_rx.recv())); // Use unwrap_err because we expect a Closed error future::block_on(future).unwrap_err(); }) From 67b3b5a45326442a9bec2993c11a2a9d7a7f6baa Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 16:09:06 -0700 Subject: [PATCH 23/95] Clean up thread builder code --- crates/bevy_tasks/src/task_pool.rs | 30 ++++++------------------------ 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index d92ccbab800f1..3fcb875166574 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -144,16 +144,9 @@ impl TaskPool { let mut threads = Vec::with_capacity(compute_threads + io_threads + async_compute_threads); threads.extend((0..compute_threads).map(|i| { - let thread_builder = make_thread_builder( - builder.thread_name.as_deref(), - "Compute", - i, - builder.stack_size, - ); let compute = Arc::clone(&compute_executor); let shutdown_rx = shutdown_rx.clone(); - - thread_builder + make_thread_builder(&builder, "Compute", i) .spawn(move || { // Use unwrap_err because we expect a Closed error future::block_on(compute.run(shutdown_rx.recv())).unwrap_err(); @@ -161,13 +154,10 @@ impl TaskPool { .expect("Failed to spawn thread.") })); threads.extend((0..io_threads).map(|i| { - let thread_builder = - make_thread_builder(builder.thread_name.as_deref(), "IO", i, builder.stack_size); let compute = Arc::clone(&compute_executor); let io = Arc::clone(&io_executor); let shutdown_rx = shutdown_rx.clone(); - - thread_builder + make_thread_builder(&builder, "IO", i) .spawn(move || { let future = io .run(shutdown_rx.recv()) @@ -178,18 +168,11 @@ impl TaskPool { .expect("Failed to spawn thread.") })); threads.extend((0..async_compute_threads).map(|i| { - let thread_builder = make_thread_builder( - builder.thread_name.as_deref(), - "Aync Compute", - i, - builder.stack_size, - ); let compute = Arc::clone(&compute_executor); let async_compute = Arc::clone(&compute_executor); let io = Arc::clone(&io_executor); let shutdown_rx = shutdown_rx.clone(); - - thread_builder + make_thread_builder(&builder, "Aync Compute", i) .spawn(move || { let future = async_compute .run(shutdown_rx.recv()) @@ -407,16 +390,15 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { } fn make_thread_builder( - thread_name: Option<&str>, + builder: &TaskPoolBuilder, prefix: &'static str, idx: usize, - stack_size: Option, ) -> thread::Builder { // miri does not support setting thread names // TODO: change back when https://github.com/rust-lang/miri/issues/1717 is fixed #[cfg(not(miri))] let mut thread_builder = { - let thread_name = if let Some(ref thread_name) = thread_name { + let thread_name = if let Some(ref thread_name) = builder.thread_name { format!("{} ({}, {})", thread_name, prefix, idx) } else { format!("TaskPool ({}, {})", prefix, idx) @@ -431,7 +413,7 @@ fn make_thread_builder( thread::Builder::new() }; - if let Some(stack_size) = stack_size { + if let Some(stack_size) = builder.stack_size { thread_builder = thread_builder.stack_size(stack_size); } From e1608391d802b5fd972096d561ac75d339e944b7 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 16:09:44 -0700 Subject: [PATCH 24/95] Try removing miri cfg --- crates/bevy_tasks/src/task_pool.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 3fcb875166574..218be7df54952 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -394,9 +394,6 @@ fn make_thread_builder( prefix: &'static str, idx: usize, ) -> thread::Builder { - // miri does not support setting thread names - // TODO: change back when https://github.com/rust-lang/miri/issues/1717 is fixed - #[cfg(not(miri))] let mut thread_builder = { let thread_name = if let Some(ref thread_name) = builder.thread_name { format!("{} ({}, {})", thread_name, prefix, idx) @@ -406,13 +403,6 @@ fn make_thread_builder( thread::Builder::new().name(thread_name) }; - #[cfg(miri)] - let mut thread_builder = { - let _ = idx; - let _ = thread_name; - thread::Builder::new() - }; - if let Some(stack_size) = builder.stack_size { thread_builder = thread_builder.stack_size(stack_size); } From 9b933471da3eece1eb06ff9a10bfd14a155cbd2e Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 18:30:08 -0700 Subject: [PATCH 25/95] Review fixes for single_threaded_task_pool --- .../src/single_threaded_task_pool.rs | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 2629a793744d2..58f124d115697 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -15,18 +15,17 @@ impl TaskPoolBuilder { Self::default() } - /// Override the number of compute-priority threads created for the pool. If unset, this default to the number - /// of logical cores of the system + /// No op on the single threaded task pool pub fn compute_threads(self, num_threads: usize) -> Self { self } - /// Override the number of async-compute priority threads created for the pool. If unset, this defaults to 0. + /// No op on the single threaded task pool pub fn async_compute_threads(self, num_threads: usize) -> Self { self } - /// Override the number of IO-priority threads created for the pool. If unset, this defaults to 0. + /// No op on the single threaded task pool pub fn io_threads(self, num_threads: usize) -> Self { self } @@ -206,28 +205,6 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawn_local(f); } - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive - /// the provided future. The results of the future will be returned as a part of - /// [`TaskPool::scope`]'s return value. - /// - /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. - /// - /// For more information, see [`TaskPool::scope`]. - pub fn spawn_async_compute + 'scope>(&mut self, f: Fut) { - self.spawn_local(f); - } - - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive - /// the provided future. The results of the future will be returned as a part of - /// [`TaskPool::scope`]'s return value. - /// - /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. - /// - /// For more information, see [`TaskPool::scope`]. - pub fn spawn_io + 'scope>(&mut self, f: Fut) { - self.spawn_local(f); - } - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. From 291c09b0881661e0a38e82f3f268d0a4c1070787 Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 20:18:49 -0700 Subject: [PATCH 26/95] Add error logs if running without any threads in a given group --- crates/bevy_tasks/Cargo.toml | 1 + crates/bevy_tasks/src/task_pool.rs | 130 +++++++++++++++++++---------- 2 files changed, 86 insertions(+), 45 deletions(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 7b83b9bc344bd..0b0cfda1db268 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -14,6 +14,7 @@ event-listener = "2.5.2" async-executor = "1.3.0" async-channel = "1.4.2" num_cpus = "1.0.1" +tracing = "0.1" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 218be7df54952..4ff7f06cffbdd 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -72,7 +72,9 @@ impl TaskPoolBuilder { #[derive(Debug)] struct TaskPoolInner { - threads: Vec>, + compute_threads: Vec>, + async_compute_threads: Vec>, + io_threads: Vec>, shutdown_tx: async_channel::Sender<()>, } @@ -81,7 +83,19 @@ impl Drop for TaskPoolInner { self.shutdown_tx.close(); let panicking = thread::panicking(); - for join_handle in self.threads.drain(..) { + for join_handle in self.compute_threads.drain(..) { + let res = join_handle.join(); + if !panicking { + res.expect("Task thread panicked while executing."); + } + } + for join_handle in self.async_compute_threads.drain(..) { + let res = join_handle.join(); + if !panicking { + res.expect("Task thread panicked while executing."); + } + } + for join_handle in self.io_threads.drain(..) { let res = join_handle.join(); if !panicking { res.expect("Task thread panicked while executing."); @@ -142,54 +156,61 @@ impl TaskPool { let io_threads = builder.io_threads.unwrap_or(0); let async_compute_threads = builder.async_compute_threads.unwrap_or(0); - let mut threads = Vec::with_capacity(compute_threads + io_threads + async_compute_threads); - threads.extend((0..compute_threads).map(|i| { - let compute = Arc::clone(&compute_executor); - let shutdown_rx = shutdown_rx.clone(); - make_thread_builder(&builder, "Compute", i) - .spawn(move || { - // Use unwrap_err because we expect a Closed error - future::block_on(compute.run(shutdown_rx.recv())).unwrap_err(); - }) - .expect("Failed to spawn thread.") - })); - threads.extend((0..io_threads).map(|i| { - let compute = Arc::clone(&compute_executor); - let io = Arc::clone(&io_executor); - let shutdown_rx = shutdown_rx.clone(); - make_thread_builder(&builder, "IO", i) - .spawn(move || { - let future = io - .run(shutdown_rx.recv()) - .or(compute.run(shutdown_rx.recv())); - // Use unwrap_err because we expect a Closed error - future::block_on(future).unwrap_err(); - }) - .expect("Failed to spawn thread.") - })); - threads.extend((0..async_compute_threads).map(|i| { - let compute = Arc::clone(&compute_executor); - let async_compute = Arc::clone(&compute_executor); - let io = Arc::clone(&io_executor); - let shutdown_rx = shutdown_rx.clone(); - make_thread_builder(&builder, "Aync Compute", i) - .spawn(move || { - let future = async_compute - .run(shutdown_rx.recv()) - .or(compute.run(shutdown_rx.recv())) - .or(io.run(shutdown_rx.recv())); - // Use unwrap_err because we expect a Closed error - future::block_on(future).unwrap_err(); - }) - .expect("Failed to spawn thread.") - })); + let compute_threads = (0..compute_threads) + .map(|i| { + let compute = Arc::clone(&compute_executor); + let shutdown_rx = shutdown_rx.clone(); + make_thread_builder(&builder, "Compute", i) + .spawn(move || { + // Use unwrap_err because we expect a Closed error + future::block_on(compute.run(shutdown_rx.recv())).unwrap_err(); + }) + .expect("Failed to spawn thread.") + }) + .collect(); + let io_threads = (0..io_threads) + .map(|i| { + let compute = Arc::clone(&compute_executor); + let io = Arc::clone(&io_executor); + let shutdown_rx = shutdown_rx.clone(); + make_thread_builder(&builder, "IO", i) + .spawn(move || { + let future = io + .run(shutdown_rx.recv()) + .or(compute.run(shutdown_rx.recv())); + // Use unwrap_err because we expect a Closed error + future::block_on(future).unwrap_err(); + }) + .expect("Failed to spawn thread.") + }) + .collect(); + let async_compute_threads = (0..async_compute_threads) + .map(|i| { + let compute = Arc::clone(&compute_executor); + let async_compute = Arc::clone(&compute_executor); + let io = Arc::clone(&io_executor); + let shutdown_rx = shutdown_rx.clone(); + make_thread_builder(&builder, "Aync Compute", i) + .spawn(move || { + let future = async_compute + .run(shutdown_rx.recv()) + .or(compute.run(shutdown_rx.recv())) + .or(io.run(shutdown_rx.recv())); + // Use unwrap_err because we expect a Closed error + future::block_on(future).unwrap_err(); + }) + .expect("Failed to spawn thread.") + }) + .collect(); Self { compute_executor, async_compute_executor, io_executor, inner: Arc::new(TaskPoolInner { - threads, + compute_threads, + async_compute_threads, + io_threads, shutdown_tx, }), } @@ -197,7 +218,18 @@ impl TaskPool { /// Return the number of threads owned by the task pool pub fn thread_num(&self) -> usize { - self.inner.threads.len() + self.thread_count_for(TaskGroup::Compute) + + self.thread_count_for(TaskGroup::AsyncCompute) + + self.thread_count_for(TaskGroup::IO) + } + + /// Return the number of threads owned by a given group in the task pool + pub fn thread_count_for(&self, group: TaskGroup) -> usize { + match group { + TaskGroup::Compute => self.inner.compute_threads.len(), + TaskGroup::IO => self.inner.compute_threads.len(), + TaskGroup::AsyncCompute => self.inner.compute_threads.len(), + } } /// Allows spawning non-`'static` futures on the thread pool under the [`Compute`] task group. @@ -227,6 +259,10 @@ impl TaskPool { F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { + if self.thread_count_for(group) == 0 { + tracing::error!("Attempting to use TaskPool::scope with the {:?} task group, but there are no threads for it!", + group); + } // SAFETY: This function blocks until all futures complete, so this future must return // before this function returns. However, rust has no way of knowing // this so we must convert to 'static here to appease the compiler as it is unable to @@ -327,6 +363,10 @@ impl TaskPool { where T: Send + 'static, { + if self.thread_count_for(group) == 0 { + tracing::error!("Attempted to use TaskPool::spawn with the {:?} task group, but there are no threads for it!", + group); + } Task::new(match group { TaskGroup::Compute => self.compute_executor.spawn(future), TaskGroup::AsyncCompute => self.async_compute_executor.spawn(future), From 5123487013d65ca62712c0698656b1214dff326a Mon Sep 17 00:00:00 2001 From: james7132 Date: Sun, 15 May 2022 20:58:46 -0700 Subject: [PATCH 27/95] Merge DefaultTaskPoolOptions in. --- crates/bevy_core/src/lib.rs | 13 +- crates/bevy_core/src/task_pool_options.rs | 137 -------------------- crates/bevy_tasks/src/task_pool.rs | 150 +++++++++++++++------- 3 files changed, 110 insertions(+), 190 deletions(-) delete mode 100644 crates/bevy_core/src/task_pool_options.rs diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index 44b2be465b9cf..b6cb9c0445568 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -2,18 +2,16 @@ //! This crate provides core functionality for Bevy Engine. mod name; -mod task_pool_options; mod time; pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable}; pub use name::*; -pub use task_pool_options::*; pub use time::*; pub mod prelude { //! The Bevy Core Prelude. #[doc(hidden)] - pub use crate::{DefaultTaskPoolOptions, Name, Time, Timer}; + pub use crate::{Name, Time, Timer}; } use bevy_app::prelude::*; @@ -22,6 +20,7 @@ use bevy_ecs::{ schedule::{ExclusiveSystemDescriptorCoercion, SystemLabel}, system::IntoExclusiveSystem, }; +use bevy_tasks::TaskPool; use bevy_utils::HashSet; use std::ops::Range; @@ -39,12 +38,8 @@ pub enum CoreSystem { impl Plugin for CorePlugin { fn build(&self, app: &mut App) { - // Setup the default bevy task pools - app.world - .get_resource::() - .cloned() - .unwrap_or_default() - .create_default_pools(&mut app.world); + // Setup the default bevy task pool if not already set up + app.world.get_resource_or_insert_with(TaskPool::new); app.init_resource::