Skip to content

Commit

Permalink
Add default task pools and method for configuring them (bevyengine#2)
Browse files Browse the repository at this point in the history
* Remove generics from TaskPool newtypes and some of the task API
Add IO, AsyncCompute and Compute TaskPools
Move TaskPool setup from bevy_ecs to bevy_app
ParallelExecutorOptions is essentially replaced by DefaultTaskPoolOptions

* Pull TaskPool and related types out of bevy_tasks/lib.rs into a separate module
Add a prelude to bevy_tasks
Update the version of bevy_tasks to match other crates

* Assert percent of cores >= 0
  • Loading branch information
aclysma authored Aug 28, 2020
1 parent 93fcd7d commit e406b07
Show file tree
Hide file tree
Showing 13 changed files with 521 additions and 378 deletions.
2 changes: 2 additions & 0 deletions crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ keywords = ["bevy"]
# bevy
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks" }
num_cpus = "1"

# other
libloading = "0.6"
Expand Down
7 changes: 6 additions & 1 deletion crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::app_builder::AppBuilder;
use crate::DefaultTaskPoolOptions;
use bevy_ecs::{ParallelExecutor, Resources, Schedule, World};

#[allow(clippy::needless_doctest_main)]
Expand Down Expand Up @@ -63,7 +64,11 @@ impl App {
}

pub fn run(mut self) {
ParallelExecutor::initialize_pools(&mut self.resources);
// Setup the default bevy task pools
self.resources
.get_cloned::<DefaultTaskPoolOptions>()
.unwrap_or_else(DefaultTaskPoolOptions::default)
.create_default_pools(&mut self.resources);

self.startup_schedule.initialize(&mut self.resources);
self.startup_executor.run(
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ mod app_builder;
mod event;
mod plugin;
mod schedule_runner;
mod task_pool_options;

pub use app::*;
pub use app_builder::*;
pub use bevy_derive::DynamicPlugin;
pub use event::*;
pub use plugin::*;
pub use schedule_runner::*;
pub use task_pool_options::*;

pub mod prelude {
pub use crate::{
Expand Down
157 changes: 157 additions & 0 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

fn clamp_usize(value: usize, min: usize, max: usize) -> usize {
if value > max {
max
} else if value < min {
min
} else {
value
}
}

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
pub struct TaskPoolThreadAssignmentPolicy {
/// Force using at least this many threads
pub min_threads: usize,
/// Under no circumstance use more than this many threads for this pool
pub max_threads: usize,
/// Target using this percentage of total cores, clamped by min_threads and max_threads. It is
/// permitted to use 1.0 to try to use all remaining threads
pub percent: f32,
}

impl TaskPoolThreadAssignmentPolicy {
/// Determine the number of threads to use for this task pool
fn get_number_of_threads(&self, remaining_threads: usize, total_threads: usize) -> usize {
assert!(self.percent >= 0.0);
let mut desired = (total_threads as f32 * self.percent).round() as usize;

// Limit ourselves to the number of cores available
desired = desired.min(remaining_threads);

// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
clamp_usize(desired, self.min_threads, self.max_threads)
}
}

/// Helper for configuring and creating the default task pools. For end-users who want full control,
/// insert the default task pools into the resource map manually. If the pools are already inserted,
/// this helper will do nothing.
#[derive(Clone)]
pub struct DefaultTaskPoolOptions {
/// If the number of physical cores is less than min_total_threads, force using min_total_threads
pub min_total_threads: usize,
/// If the number of physical cores is grater than max_total_threads, force using max_total_threads
pub max_total_threads: usize,

/// Used to determine number of IO threads to allocate
pub io: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of async compute threads to allocate
pub async_compute: TaskPoolThreadAssignmentPolicy,
/// Used to determine number of compute threads to allocate
pub compute: TaskPoolThreadAssignmentPolicy,
}

impl Default for DefaultTaskPoolOptions {
fn default() -> Self {
DefaultTaskPoolOptions {
// By default, use however many cores are available on the system
min_total_threads: 1,
max_total_threads: std::usize::MAX,

// Use 25% of cores for IO, at least 1, no more than 4
io: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use 25% of cores for async compute, at least 1, no more than 4
async_compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: 4,
percent: 0.25,
},

// Use all remaining cores for compute (at least 1)
compute: TaskPoolThreadAssignmentPolicy {
min_threads: 1,
max_threads: std::usize::MAX,
percent: 1.0, // This 1.0 here means "whatever is left over"
},
}
}
}

impl DefaultTaskPoolOptions {
/// Create a configuration that forces using the given number of threads.
pub fn with_num_threads(thread_count: usize) -> Self {
let mut options = Self::default();
options.min_total_threads = thread_count;
options.max_total_threads = thread_count;

options
}

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = clamp_usize(
num_cpus::get(),
self.min_total_threads,
self.max_total_threads,
);

let mut remaining_threads = total_threads;

if !resources.contains::<IOTaskPool>() {
// Determine the number of IO threads we will use
let io_threads = self
.io
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= io_threads;

resources.insert(IOTaskPool(
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
.build(),
));
}

if !resources.contains::<AsyncComputeTaskPool>() {
// Determine the number of async compute threads we will use
let async_compute_threads = self
.async_compute
.get_number_of_threads(remaining_threads, total_threads);
remaining_threads -= async_compute_threads;

resources.insert(AsyncComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
.build(),
));
}

if !resources.contains::<ComputeTaskPool>() {
// Determine the number of compute threads we will use
// This is intentionally last so that an end user can specify 1.0 as the percent
let compute_threads = self
.compute
.get_number_of_threads(remaining_threads, total_threads);

resources.insert(ComputeTaskPool(
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
.build(),
));
}
}
}
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/resource/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ impl Resources {
self.get_resource_mut(ResourceIndex::Global)
}

/// Returns a clone of the underlying resource, this is helpful when borrowing something
/// cloneable (like a task pool) without taking a borrow on the resource map
pub fn get_cloned<T: Resource + Clone>(&self) -> Option<T> {
self.get::<T>().map(|r| (*r).clone())
}
Expand Down
67 changes: 3 additions & 64 deletions crates/bevy_ecs/src/schedule/parallel_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,6 @@ impl Default for ParallelExecutor {
}

impl ParallelExecutor {
pub fn initialize_pools(resources: &mut Resources) {
let compute_pool: bevy_tasks::ComputePool = resources
.get::<ParallelExecutorOptions>()
.map(|options| (*options).clone())
.unwrap_or_else(ParallelExecutorOptions::default)
.create_builder()
.build();

// For now, bevy_ecs only uses the global task pool so it is sufficient to configure it once here.
resources.insert(compute_pool);
}

pub fn without_tracker_clears() -> Self {
Self {
clear_trackers: false,
Expand Down Expand Up @@ -77,52 +65,6 @@ impl ParallelExecutor {
}
}

/// This can be added as an app resource to control the global `bevy_tasks::TaskPool` used by ecs.
// Dev internal note: We cannot directly expose a ThreadPoolBuilder here as it does not implement Send and Sync.
#[derive(Debug, Default, Clone)]
pub struct ParallelExecutorOptions {
/// If some value, we'll set up the thread pool to use at most n threads. See `bevy_tasks::TaskPoolBuilder::num_threads`.
num_threads: Option<usize>,
/// If some value, we'll set up the thread pool's' workers to the given stack size. See `bevy_tasks::TaskPoolBuilder::stack_size`.
stack_size: Option<usize>,
// TODO: Do we also need/want to expose other features (*_handler, etc.)
}

impl ParallelExecutorOptions {
/// Creates a new ParallelExecutorOptions instance
pub fn new() -> Self {
Self::default()
}

/// Sets the num_threads option, using the builder pattern
pub fn with_num_threads(mut self, num_threads: Option<usize>) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the stack_size option, using the builder pattern. WARNING: Only use this if you know what you're doing,
/// otherwise your application may run into stability and performance issues.
pub fn with_stack_size(mut self, stack_size: Option<usize>) -> Self {
self.stack_size = stack_size;
self
}

/// Creates a new ThreadPoolBuilder based on the current options.
pub(crate) fn create_builder(&self) -> bevy_tasks::TaskPoolBuilder {
let mut builder = bevy_tasks::TaskPoolBuilder::new();

if let Some(num_threads) = self.num_threads {
builder = builder.num_threads(num_threads);
}

if let Some(stack_size) = self.stack_size {
builder = builder.stack_size(stack_size);
}

builder
}
}

#[derive(Debug, Clone)]
pub struct ExecutorStage {
/// each system's set of dependencies
Expand Down Expand Up @@ -325,11 +267,6 @@ impl ExecutorStage {
system.run(world, resources);
sender.send(system_index).unwrap();
});
// scope.spawn_fifo(move |_| {
// let mut system = system.lock();
// system.run(world, resources);
// sender.send(system_index).unwrap();
// });

systems_currently_running = true;
}
Expand All @@ -345,7 +282,9 @@ impl ExecutorStage {
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let compute_pool = resources.get_cloned::<bevy_tasks::ComputePool>().unwrap();
let compute_pool = resources
.get_cloned::<bevy_tasks::ComputeTaskPool>()
.unwrap();

// if the schedule has changed, clear executor state / fill it with new defaults
if schedule_changed {
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bevy_tasks"
version = "0.1.0"
version = "0.1.3"
authors = [
"Bevy Contributors <[email protected]>",
"Lachlan Sneff <[email protected]>",
Expand Down
Loading

0 comments on commit e406b07

Please sign in to comment.