diff --git a/crates/bevy_app/Cargo.toml b/crates/bevy_app/Cargo.toml index 912b747953947..cde8462828aa1 100644 --- a/crates/bevy_app/Cargo.toml +++ b/crates/bevy_app/Cargo.toml @@ -13,6 +13,8 @@ keywords = ["bevy"] # bevy bevy_derive = { path = "../bevy_derive", version = "0.1" } bevy_ecs = { path = "../bevy_ecs", version = "0.1" } +bevy_tasks = { path = "../bevy_tasks" } +num_cpus = "1" # other libloading = "0.6" diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index c5a9fa59acf76..046f69b3a24b9 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -1,4 +1,5 @@ use crate::app_builder::AppBuilder; +use crate::DefaultTaskPoolOptions; use bevy_ecs::{ParallelExecutor, Resources, Schedule, World}; #[allow(clippy::needless_doctest_main)] @@ -63,7 +64,11 @@ impl App { } pub fn run(mut self) { - ParallelExecutor::initialize_pools(&mut self.resources); + // Setup the default bevy task pools + self.resources + .get_cloned::() + .unwrap_or_else(DefaultTaskPoolOptions::default) + .create_default_pools(&mut self.resources); self.startup_schedule.initialize(&mut self.resources); self.startup_executor.run( diff --git a/crates/bevy_app/src/lib.rs b/crates/bevy_app/src/lib.rs index d2e241616b55d..a409edbfafae3 100644 --- a/crates/bevy_app/src/lib.rs +++ b/crates/bevy_app/src/lib.rs @@ -8,6 +8,7 @@ mod app_builder; mod event; mod plugin; mod schedule_runner; +mod task_pool_options; pub use app::*; pub use app_builder::*; @@ -15,6 +16,7 @@ pub use bevy_derive::DynamicPlugin; pub use event::*; pub use plugin::*; pub use schedule_runner::*; +pub use task_pool_options::*; pub mod prelude { pub use crate::{ diff --git a/crates/bevy_app/src/task_pool_options.rs b/crates/bevy_app/src/task_pool_options.rs new file mode 100644 index 0000000000000..f5cdc9d6680c8 --- /dev/null +++ b/crates/bevy_app/src/task_pool_options.rs @@ -0,0 +1,157 @@ +use bevy_ecs::Resources; +use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder}; + +fn clamp_usize(value: usize, min: usize, max: usize) -> usize { + if value > max { + max + } else if value < min { + min + } else { + value + } +} + +/// Defines a simple way to determine how many threads to use given the number of remaining cores +/// and number of total cores +#[derive(Clone)] +pub struct TaskPoolThreadAssignmentPolicy { + /// Force using at least this many threads + pub min_threads: usize, + /// Under no circumstance use more than this many threads for this pool + pub max_threads: usize, + /// Target using this percentage of total cores, clamped by min_threads and max_threads. It is + /// permitted to use 1.0 to try to use all remaining threads + pub percent: f32, +} + +impl TaskPoolThreadAssignmentPolicy { + /// Determine the number of threads to use for this task pool + fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize { + assert!(self.percent >= 0.0); + let mut desired = (total_threads as f32 * self.percent).round() as usize; + + // Limit ourselves to the number of cores available + desired = desired.min(remaining_threads); + + // Clamp by min_threads, max_threads. (This may result in us using more threads than are + // available, this is intended. An example case where this might happen is a device with + // <= 2 threads. + clamp_usize(desired, self.min_threads, self.max_threads) + } +} + +/// Helper for configuring and creating the default task pools. For end-users who want full control, +/// insert the default task pools into the resource map manually. If the pools are already inserted, +/// this helper will do nothing. +#[derive(Clone)] +pub struct DefaultTaskPoolOptions { + /// If the number of physical cores is less than min_total_threads, force using min_total_threads + pub min_total_threads: usize, + /// If the number of physical cores is grater than max_total_threads, force using max_total_threads + pub max_total_threads: usize, + + /// Used to determine number of IO threads to allocate + pub io: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of async compute threads to allocate + pub async_compute: TaskPoolThreadAssignmentPolicy, + /// Used to determine number of compute threads to allocate + pub compute: TaskPoolThreadAssignmentPolicy, +} + +impl Default for DefaultTaskPoolOptions { + fn default() -> Self { + DefaultTaskPoolOptions { + // By default, use however many cores are available on the system + min_total_threads: 1, + max_total_threads: std::usize::MAX, + + // Use 25% of cores for IO, at least 1, no more than 4 + io: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use 25% of cores for async compute, at least 1, no more than 4 + async_compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: 4, + percent: 0.25, + }, + + // Use all remaining cores for compute (at least 1) + compute: TaskPoolThreadAssignmentPolicy { + min_threads: 1, + max_threads: std::usize::MAX, + percent: 1.0, // This 1.0 here means "whatever is left over" + }, + } + } +} + +impl DefaultTaskPoolOptions { + /// Create a configuration that forces using the given number of threads. + pub fn with_num_threads(thread_count: usize) -> Self { + let mut options = Self::default(); + options.min_total_threads = thread_count; + options.max_total_threads = thread_count; + + options + } + + /// Inserts the default thread pools into the given resource map based on the configured values + pub fn create_default_pools(&self, resources: &mut Resources) { + let total_threads = clamp_usize( + num_cpus::get(), + self.min_total_threads, + self.max_total_threads, + ); + + let mut remaining_threads = total_threads; + + if !resources.contains::() { + // Determine the number of IO threads we will use + let io_threads = self + .io + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= io_threads; + + resources.insert(IOTaskPool( + TaskPoolBuilder::default() + .num_threads(io_threads) + .thread_name("IO Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of async compute threads we will use + let async_compute_threads = self + .async_compute + .get_number_of_threads(remaining_threads, total_threads); + remaining_threads -= async_compute_threads; + + resources.insert(AsyncComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(async_compute_threads) + .thread_name("Async Compute Task Pool".to_string()) + .build(), + )); + } + + if !resources.contains::() { + // Determine the number of compute threads we will use + // This is intentionally last so that an end user can specify 1.0 as the percent + let compute_threads = self + .compute + .get_number_of_threads(remaining_threads, total_threads); + + resources.insert(ComputeTaskPool( + TaskPoolBuilder::default() + .num_threads(compute_threads) + .thread_name("Compute Task Pool".to_string()) + .build(), + )); + } + } +} diff --git a/crates/bevy_ecs/src/resource/resources.rs b/crates/bevy_ecs/src/resource/resources.rs index 0d9d5afba0a0f..86f324d269a1c 100644 --- a/crates/bevy_ecs/src/resource/resources.rs +++ b/crates/bevy_ecs/src/resource/resources.rs @@ -42,6 +42,8 @@ impl Resources { self.get_resource_mut(ResourceIndex::Global) } + /// Returns a clone of the underlying resource, this is helpful when borrowing something + /// cloneable (like a task pool) without taking a borrow on the resource map pub fn get_cloned(&self) -> Option { self.get::().map(|r| (*r).clone()) } diff --git a/crates/bevy_ecs/src/schedule/parallel_executor.rs b/crates/bevy_ecs/src/schedule/parallel_executor.rs index 630fadb83f100..c8b6585fd2b74 100644 --- a/crates/bevy_ecs/src/schedule/parallel_executor.rs +++ b/crates/bevy_ecs/src/schedule/parallel_executor.rs @@ -35,18 +35,6 @@ impl Default for ParallelExecutor { } impl ParallelExecutor { - pub fn initialize_pools(resources: &mut Resources) { - let compute_pool: bevy_tasks::ComputePool = resources - .get::() - .map(|options| (*options).clone()) - .unwrap_or_else(ParallelExecutorOptions::default) - .create_builder() - .build(); - - // For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here. - resources.insert(compute_pool); - } - pub fn without_tracker_clears() -> Self { Self { clear_trackers: false, @@ -77,52 +65,6 @@ impl ParallelExecutor { } } -/// This can be added as an app resource to control the global `bevy_tasks::TaskPool` used by ecs. -// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync. -#[derive(Debug, Default, Clone)] -pub struct ParallelExecutorOptions { - /// If some value, we'll set up the thread pool to use at most n threads. See `bevy_tasks::TaskPoolBuilder::num_threads`. - num_threads: Option, - /// If some value, we'll set up the thread pool's' workers to the given stack size. See `bevy_tasks::TaskPoolBuilder::stack_size`. - stack_size: Option, - // TODO: Do we also need/want to expose other features (*_handler, etc.) -} - -impl ParallelExecutorOptions { - /// Creates a new ParallelExecutorOptions instance - pub fn new() -> Self { - Self::default() - } - - /// Sets the num_threads option, using the builder pattern - pub fn with_num_threads(mut self, num_threads: Option) -> Self { - self.num_threads = num_threads; - self - } - - /// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing, - /// otherwise your application may run into stability and performance issues. - pub fn with_stack_size(mut self, stack_size: Option) -> Self { - self.stack_size = stack_size; - self - } - - /// Creates a new ThreadPoolBuilder based on the current options. - pub(crate) fn create_builder(&self) -> bevy_tasks::TaskPoolBuilder { - let mut builder = bevy_tasks::TaskPoolBuilder::new(); - - if let Some(num_threads) = self.num_threads { - builder = builder.num_threads(num_threads); - } - - if let Some(stack_size) = self.stack_size { - builder = builder.stack_size(stack_size); - } - - builder - } -} - #[derive(Debug, Clone)] pub struct ExecutorStage { /// each system's set of dependencies @@ -325,11 +267,6 @@ impl ExecutorStage { system.run(world, resources); sender.send(system_index).unwrap(); }); - // scope.spawn_fifo(move |_| { - // let mut system = system.lock(); - // system.run(world, resources); - // sender.send(system_index).unwrap(); - // }); systems_currently_running = true; } @@ -345,7 +282,9 @@ impl ExecutorStage { systems: &[Arc>>], schedule_changed: bool, ) { - let compute_pool = resources.get_cloned::().unwrap(); + let compute_pool = resources + .get_cloned::() + .unwrap(); // if the schedule has changed, clear executor state / fill it with new defaults if schedule_changed { diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index fef7a462b587a..38ee408ee2682 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bevy_tasks" -version = "0.1.0" +version = "0.1.3" authors = [ "Bevy Contributors ", "Lachlan Sneff ", diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 5075f5fc55e54..1757591740e86 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -1,296 +1,18 @@ -use parking::Unparker; -use std::{ - future::Future, - marker::PhantomData, - mem, - pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::{self, JoinHandle}, -}; - mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; -mod usages; -pub use usages::Compute; - -pub type ComputePool = TaskPool; - -macro_rules! pin_mut { - ($($x:ident),*) => { $( - // Move the value to ensure that it is owned - let mut $x = $x; - // Shadow the original binding so that it can't be directly accessed - // ever again. - #[allow(unused_mut)] - let mut $x = unsafe { - Pin::new_unchecked(&mut $x) - }; - )* } -} - -/// Used to create a TaskPool -#[derive(Debug, Default, Clone)] -pub struct TaskPoolBuilder { - /// If set, we'll set up the thread pool to use at most n threads. Otherwise use - /// the logical core count of the system - num_threads: Option, - /// If set, we'll use the given stack size rather than the system default - stack_size: Option, - /// Allows customizing the name of the threads - helpful for debugging. If set, threads will - /// be named (), i.e. "MyThreadPool (2)" - thread_name: Option, -} - -impl TaskPoolBuilder { - /// Creates a new TaskPoolBuilder instance - pub fn new() -> Self { - Self::default() - } - - /// Override the number of threads created for the pool. If unset, we default to the number - /// of logical cores of the system - pub fn num_threads(mut self, num_threads: usize) -> Self { - self.num_threads = Some(num_threads); - self - } - - /// Override the stack size of the threads created for the pool - pub fn stack_size(mut self, stack_size: usize) -> Self { - self.stack_size = Some(stack_size); - self - } - - /// Override the name of the threads created for the pool. If set, threads will - /// be named (), i.e. "MyThreadPool (2)" - pub fn thread_name(mut self, thread_name: String) -> Self { - self.thread_name = Some(thread_name); - self - } - - /// Creates a new ThreadPoolBuilder based on the current options. - pub fn build(self) -> TaskPool { - TaskPool::new_internal( - self.num_threads, - self.stack_size, - self.thread_name.as_deref(), - ) - } -} - -struct TaskPoolInternal { - threads: Vec<(JoinHandle<()>, Arc)>, - shutdown_flag: Arc, -} - -impl Drop for TaskPoolInternal { - fn drop(&mut self) { - self.shutdown_flag.store(true, Ordering::Release); - - for (_, unparker) in &self.threads { - unparker.unpark(); - } - for (join_handle, _) in self.threads.drain(..) { - join_handle - .join() - .expect("task thread panicked while executing"); - } - } -} - -/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by -/// the pool on threads owned by the pool. -pub struct TaskPool { - executor: Arc, - internal: Arc, - _marker: PhantomData, -} - -impl TaskPool { - /// Create a `TaskPool` with the default configuration. - pub fn new() -> Self { - TaskPoolBuilder::new().build() - } - - fn new_internal( - num_threads: Option, - stack_size: Option, - thread_name: Option<&str>, - ) -> Self { - let executor = Arc::new(multitask::Executor::new()); - let shutdown_flag = Arc::new(AtomicBool::new(false)); - - let num_threads = num_threads.unwrap_or_else(num_cpus::get); - - let threads = (0..num_threads) - .map(|i| { - let ex = Arc::clone(&executor); - let flag = Arc::clone(&shutdown_flag); - let (p, u) = parking::pair(); - let unparker = Arc::new(u); - let u = Arc::clone(&unparker); - // Run an executor thread. - - let thread_name = if let Some(thread_name) = thread_name { - format!("{} ({})", thread_name, i) - } else { - format!("TaskPool ({})", i) - }; - - let mut thread_builder = thread::Builder::new().name(thread_name); - - if let Some(stack_size) = stack_size { - thread_builder = thread_builder.stack_size(stack_size); - } - - let handle = thread_builder - .spawn(move || { - let ticker = ex.ticker(move || u.unpark()); - loop { - if flag.load(Ordering::Acquire) { - break; - } +mod task_pool; +pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; - if !ticker.tick() { - p.park(); - } - } - }) - .expect("failed to spawn thread"); - - (handle, unparker) - }) - .collect(); - - Self { - executor, - internal: Arc::new(TaskPoolInternal { - threads, - shutdown_flag, - }), - _marker: PhantomData, - } - } - - /// Return the number of threads owned by the task pool - pub fn thread_num(&self) -> usize { - self.internal.threads.len() - } - - /// Allows spawning non-`static futures on the thread pool. The function takes a callback, - /// passing a scope object into it. The scope object provided to the callback can be used - /// to spawn tasks. This function will await the completion of all tasks before returning. - /// - /// This is similar to `rayon::scope` and `crossbeam::scope` - pub fn scope<'scope, F, T>(&self, f: F) -> Vec - where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, - T: Send + 'static, - { - let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; - - let fut = async move { - let mut scope = Scope { - executor, - spawned: Vec::new(), - }; - - f(&mut scope); - - let mut results = Vec::with_capacity(scope.spawned.len()); - for task in scope.spawned { - results.push(task.await); - } - - results - }; - - pin_mut!(fut); - - // let fut: Pin<&mut (dyn Future + Send)> = fut; - let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = - unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; - - pollster::block_on(self.executor.spawn(fut)) - } - - /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be - /// cancelled and "detached" allowing it to continue running without having to be polled by the - /// end-user. - pub fn spawn( - &self, - future: impl Future + Send + 'static, - ) -> impl Future + Send - where - T: Send + 'static, - { - self.executor.spawn(future) - } -} - -impl Default for TaskPool { - fn default() -> Self { - Self::new() - } -} - -impl Clone for TaskPool { - fn clone(&self) -> Self { - Self { - executor: Arc::clone(&self.executor), - internal: Arc::clone(&self.internal), - _marker: PhantomData, - } - } -} - -pub struct Scope<'scope, T> { - executor: &'scope multitask::Executor, - spawned: Vec>, -} - -impl<'scope, T: Send + 'static> Scope<'scope, T> { - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { - let fut: Pin + 'scope + Send>> = Box::pin(f); - let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; - - let task = self.executor.spawn(fut); - self.spawned.push(task); - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - pub fn test_spawn() { - let pool = TaskPool::::new(); - - let foo = Box::new(42); - let foo = &*foo; - - let outputs = pool.scope(|scope| { - for i in 0..100 { - scope.spawn(async move { - println!("task {}", i); - if *foo != 42 { - panic!("not 42!?!?") - } else { - *foo - } - }); - } - }); +mod usages; +pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}; - for output in outputs { - assert_eq!(output, 42); - } - } +pub mod prelude { + pub use crate::{ + slice::{ParallelSlice, ParallelSliceMut}, + usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool}, + }; } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index bc2465a77a9fa..3461b0bc20756 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,12 +1,7 @@ use super::TaskPool; pub trait ParallelSlice: AsRef<[T]> { - fn par_chunk_map( - &self, - task_pool: &TaskPool, - chunk_size: usize, - f: F, - ) -> Vec + fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -20,12 +15,7 @@ pub trait ParallelSlice: AsRef<[T]> { }) } - fn par_splat_map( - &self, - task_pool: &TaskPool, - max_tasks: Option, - f: F, - ) -> Vec + fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec where F: Fn(&[T]) -> R + Send + Sync, R: Send + 'static, @@ -46,12 +36,7 @@ pub trait ParallelSlice: AsRef<[T]> { impl ParallelSlice for S where S: AsRef<[T]> {} pub trait ParallelSliceMut: AsMut<[T]> { - fn par_chunk_map_mut( - &mut self, - task_pool: &TaskPool, - chunk_size: usize, - f: F, - ) -> Vec + fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(&mut [T]) -> R + Send + Sync, R: Send + 'static, @@ -65,9 +50,9 @@ pub trait ParallelSliceMut: AsMut<[T]> { }) } - fn par_splat_map_mut( + fn par_splat_map_mut( &mut self, - task_pool: &TaskPool, + task_pool: &TaskPool, max_tasks: Option, f: F, ) -> Vec diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs new file mode 100644 index 0000000000000..e719f1e40e269 --- /dev/null +++ b/crates/bevy_tasks/src/task_pool.rs @@ -0,0 +1,280 @@ +use parking::Unparker; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +macro_rules! pin_mut { + ($($x:ident),*) => { $( + // Move the value to ensure that it is owned + let mut $x = $x; + // Shadow the original binding so that it can't be directly accessed + // ever again. + #[allow(unused_mut)] + let mut $x = unsafe { + Pin::new_unchecked(&mut $x) + }; + )* } +} + +/// Used to create a TaskPool +#[derive(Debug, Default, Clone)] +pub struct TaskPoolBuilder { + /// If set, we'll set up the thread pool to use at most n threads. Otherwise use + /// the logical core count of the system + num_threads: Option, + /// If set, we'll use the given stack size rather than the system default + stack_size: Option, + /// Allows customizing the name of the threads - helpful for debugging. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + thread_name: Option, +} + +impl TaskPoolBuilder { + /// Creates a new TaskPoolBuilder instance + pub fn new() -> Self { + Self::default() + } + + /// Override the number of threads created for the pool. If unset, we default to the number + /// of logical cores of the system + pub fn num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = Some(num_threads); + self + } + + /// Override the stack size of the threads created for the pool + pub fn stack_size(mut self, stack_size: usize) -> Self { + self.stack_size = Some(stack_size); + self + } + + /// Override the name of the threads created for the pool. If set, threads will + /// be named (), i.e. "MyThreadPool (2)" + pub fn thread_name(mut self, thread_name: String) -> Self { + self.thread_name = Some(thread_name); + self + } + + /// Creates a new ThreadPoolBuilder based on the current options. + pub fn build(self) -> TaskPool { + TaskPool::new_internal( + self.num_threads, + self.stack_size, + self.thread_name.as_deref(), + ) + } +} + +struct TaskPoolInner { + threads: Vec<(JoinHandle<()>, Arc)>, + shutdown_flag: Arc, +} + +impl Drop for TaskPoolInner { + fn drop(&mut self) { + self.shutdown_flag.store(true, Ordering::Release); + + for (_, unparker) in &self.threads { + unparker.unpark(); + } + for (join_handle, _) in self.threads.drain(..) { + join_handle + .join() + .expect("task thread panicked while executing"); + } + } +} + +/// A thread pool for executing tasks. Tasks are futures that are being automatically driven by +/// the pool on threads owned by the pool. +#[derive(Clone)] +pub struct TaskPool { + /// The executor for the pool + /// + /// This has to be separate from TaskPoolInner because we have to create an Arc to + /// pass into the worker threads, and we must create the worker threads before we can create the + /// Vec> contained within TaskPoolInner + executor: Arc, + + /// + inner: Arc, +} + +impl TaskPool { + /// Create a `TaskPool` with the default configuration. + pub fn new() -> Self { + TaskPoolBuilder::new().build() + } + + fn new_internal( + num_threads: Option, + stack_size: Option, + thread_name: Option<&str>, + ) -> Self { + let executor = Arc::new(multitask::Executor::new()); + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + let num_threads = num_threads.unwrap_or_else(num_cpus::get); + + let threads = (0..num_threads) + .map(|i| { + let ex = Arc::clone(&executor); + let flag = Arc::clone(&shutdown_flag); + let (p, u) = parking::pair(); + let unparker = Arc::new(u); + let u = Arc::clone(&unparker); + // Run an executor thread. + + let thread_name = if let Some(thread_name) = thread_name { + format!("{} ({})", thread_name, i) + } else { + format!("TaskPool ({})", i) + }; + + let mut thread_builder = thread::Builder::new().name(thread_name); + + if let Some(stack_size) = stack_size { + thread_builder = thread_builder.stack_size(stack_size); + } + + let handle = thread_builder + .spawn(move || { + let ticker = ex.ticker(move || u.unpark()); + loop { + if flag.load(Ordering::Acquire) { + break; + } + + if !ticker.tick() { + p.park(); + } + } + }) + .expect("failed to spawn thread"); + + (handle, unparker) + }) + .collect(); + + Self { + executor, + inner: Arc::new(TaskPoolInner { + threads, + shutdown_flag, + }), + } + } + + /// Return the number of threads owned by the task pool + pub fn thread_num(&self) -> usize { + self.inner.threads.len() + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope<'scope, F, T>(&self, f: F) -> Vec + where + F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + T: Send + 'static, + { + let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) }; + + let fut = async move { + let mut scope = Scope { + executor, + spawned: Vec::new(), + }; + + f(&mut scope); + + let mut results = Vec::with_capacity(scope.spawned.len()); + for task in scope.spawned { + results.push(task.await); + } + + results + }; + + pin_mut!(fut); + + // let fut: Pin<&mut (dyn Future + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + Send + 'static)> = + unsafe { mem::transmute(fut as Pin<&mut (dyn Future> + Send)>) }; + + pollster::block_on(self.executor.spawn(fut)) + } + + /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be + /// cancelled and "detached" allowing it to continue running without having to be polled by the + /// end-user. + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> impl Future + Send + where + T: Send + 'static, + { + self.executor.spawn(future) + } +} + +impl Default for TaskPool { + fn default() -> Self { + Self::new() + } +} + +pub struct Scope<'scope, T> { + executor: &'scope multitask::Executor, + spawned: Vec>, +} + +impl<'scope, T: Send + 'static> Scope<'scope, T> { + pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + let fut: Pin + 'scope + Send>> = Box::pin(f); + let fut: Pin + 'static + Send>> = unsafe { mem::transmute(fut) }; + + let task = self.executor.spawn(fut); + self.spawned.push(task); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + pub fn test_spawn() { + let pool = TaskPool::::new(); + + let foo = Box::new(42); + let foo = &*foo; + + let outputs = pool.scope(|scope| { + for i in 0..100 { + scope.spawn(async move { + println!("task {}", i); + if *foo != 42 { + panic!("not 42!?!?") + } else { + *foo + } + }); + } + }); + + for output in outputs { + assert_eq!(output, 42); + } + } +} diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 7cd15df5e44e5..f604aae70a73f 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,4 +1,52 @@ -//! Several premade usage profiles -//! Just `Compute` for now. +//! Definitions for a few common task pools that we want. Generally the determining factor for what +//! kind of work should go in each pool is latency requirements. +//! +//! For CPU-intensive work (tasks that generally spin until completion) we have a standard Compute +//! pool and an AsyncCompute pool. Work that does not need to be completed to present the next +//! frame should go to the AsyncCompute pool +//! +//! For IO-intensive work (tasks that spend very little time in a "woken" state) we have an IO +//! task pool. The tasks here are expected to complete very quickly. Generally they should just +//! await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready +//! for consumption. (likely via channels) -pub struct Compute(()); +use super::TaskPool; +use std::ops::Deref; + +/// A newtype for a task pool for CPU-intensive work that must be completed to deliver the next +/// frame +#[derive(Clone)] +pub struct ComputeTaskPool(pub TaskPool); + +impl Deref for ComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for CPU-intensive work that may span across multiple frames +#[derive(Clone)] +pub struct AsyncComputeTaskPool(pub TaskPool); + +impl Deref for AsyncComputeTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a +/// "woken" state) +#[derive(Clone)] +pub struct IOTaskPool(pub TaskPool); + +impl Deref for IOTaskPool { + type Target = TaskPool; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/examples/app/thread_pool_resources.rs b/examples/app/thread_pool_resources.rs index 99f3016236748..1da1b867c686d 100644 --- a/examples/app/thread_pool_resources.rs +++ b/examples/app/thread_pool_resources.rs @@ -1,10 +1,11 @@ use bevy::{ecs::ParallelExecutorOptions, prelude::*}; +use bevy_ecs::DefaultTaskPoolOptions; /// This example illustrates how to customize the thread pool used internally (e.g. to only use a /// certain number of threads). fn main() { App::build() - .add_resource(ParallelExecutorOptions::new().with_num_threads(Some(4))) + .add_resource(DefaultTaskPoolOptions::with_num_threads(4)) .add_default_plugins() .run(); } diff --git a/src/lib.rs b/src/lib.rs index b08856e41e7fd..b0a00b83fabf3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,12 +53,12 @@ pub use bevy_property as property; pub use bevy_render as render; pub use bevy_scene as scene; pub use bevy_sprite as sprite; +pub use bevy_tasks as tasks; pub use bevy_text as text; pub use bevy_transform as transform; pub use bevy_type_registry as type_registry; pub use bevy_ui as ui; pub use bevy_window as window; -pub use bevy_tasks as tasks; #[cfg(feature = "bevy_audio")] pub use bevy_audio as audio;