Skip to content

Commit

Permalink
Bevy tasks (bevyengine#3)
Browse files Browse the repository at this point in the history
* Address some feedback for the bevy_tasks PR
 - Add a general clamp fn to bevy_math
 - Expose num_cpus in bevy_tasks
 - Fill out empty doc comment

* Add comments and clean up pinning in bevy_tasks, fix a couple tests
  • Loading branch information
aclysma committed Aug 29, 2020
1 parent 60a27dd commit 6227612
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 34 deletions.
2 changes: 1 addition & 1 deletion crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ keywords = ["bevy"]
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks" }
num_cpus = "1"
bevy_math = { path = "../bevy_math", version = "0.1" }

# other
libloading = "0.6"
Expand Down
16 changes: 3 additions & 13 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

fn clamp_usize(value: usize, min: usize, max: usize) -> usize {
if value > max {
max
} else if value < min {
min
} else {
value
}
}

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
Expand All @@ -36,7 +26,7 @@ impl TaskPoolThreadAssignmentPolicy {
// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
clamp_usize(desired, self.min_threads, self.max_threads)
bevy_math::clamp(desired, self.min_threads, self.max_threads)
}
}

Expand Down Expand Up @@ -101,8 +91,8 @@ impl DefaultTaskPoolOptions {

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = clamp_usize(
num_cpus::get(),
let total_threads = bevy_math::clamp(
bevy_tasks::logical_core_count(),
self.min_total_threads,
self.max_total_threads,
);
Expand Down
19 changes: 19 additions & 0 deletions crates/bevy_math/src/clamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/// A value bounded by a minimum and a maximum
///
/// If input is less than min then this returns min.
/// If input is greater than max then this returns max.
/// Otherwise this returns input.
///
/// **Panics** in debug mode if `!(min <= max)`.
///
/// Original implementation from num-traits licensed as MIT
pub fn clamp<T: PartialOrd>(input: T, min: T, max: T) -> T {
debug_assert!(min <= max, "min must be less than or equal to max");
if input < min {
min
} else if input > max {
max
} else {
input
}
}
2 changes: 2 additions & 0 deletions crates/bevy_math/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod clamp;
mod face_toward;
mod geometry;

pub use clamp::*;
pub use face_toward::*;
pub use geometry::*;
pub use glam::*;
Expand Down
8 changes: 8 additions & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ pub mod prelude {
usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool},
};
}

pub fn logical_core_count() -> usize {
num_cpus::get()
}

pub fn physical_core_count() -> usize {
num_cpus::get_physical()
}
15 changes: 13 additions & 2 deletions crates/bevy_tasks/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ mod tests {
let task_pool = TaskPool::new();
let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() });

println!("outputs: {:?}", outputs);
let mut sum = 0;
for output in outputs {
sum += output;
}

assert_eq!(sum, 1000 * 42);
}

#[test]
Expand All @@ -100,6 +105,12 @@ mod tests {
numbers.iter().sum()
});

println!("outputs: {:?}", outputs);
let mut sum = 0;
for output in outputs {
sum += output;
}

assert_eq!(sum, 1000 * 42 * 2);
assert_eq!(v[0], 84);
}
}
41 changes: 23 additions & 18 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,6 @@ use std::{
thread::{self, JoinHandle},
};

macro_rules! pin_mut {
($($x:ident),*) => { $(
// Move the value to ensure that it is owned
let mut $x = $x;
// Shadow the original binding so that it can't be directly accessed
// ever again.
#[allow(unused_mut)]
let mut $x = unsafe {
Pin::new_unchecked(&mut $x)
};
)* }
}

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {
Expand Down Expand Up @@ -103,7 +90,7 @@ pub struct TaskPool {
/// Vec<Task<T>> contained within TaskPoolInner
executor: Arc<multitask::Executor>,

///
/// Inner state of the pool
inner: Arc<TaskPoolInner>,
}

Expand Down Expand Up @@ -187,7 +174,12 @@ impl TaskPool {
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
T: Send + 'static,
{
let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) };
// SAFETY: This function blocks until all futures complete, so this future must return
// before this function returns. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let executor: &multitask::Executor = &*self.executor as &multitask::Executor;
let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) };

let fut = async move {
let mut scope = Scope {
Expand All @@ -205,11 +197,20 @@ impl TaskPool {
results
};

pin_mut!(fut);
// Move the value to ensure that it is owned
let mut fut = fut;

// Shadow the original binding so that it can't be directly accessed
// ever again.
let fut = unsafe { Pin::new_unchecked(&mut fut) };

// let fut: Pin<&mut (dyn Future<Output=()> + Send)> = fut;
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<&mut (dyn Future<Output = Vec<T>> + Send)> = fut;
let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + Send + 'static)> =
unsafe { mem::transmute(fut as Pin<&mut (dyn Future<Output = Vec<T>> + Send)>) };
unsafe { mem::transmute(fut) };

pollster::block_on(self.executor.spawn(fut))
}
Expand Down Expand Up @@ -241,6 +242,10 @@ pub struct Scope<'scope, T> {

impl<'scope, T: Send + 'static> Scope<'scope, T> {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<Box<dyn Future<Output = T> + 'scope + Send>> = Box::pin(f);
let fut: Pin<Box<dyn Future<Output = T> + 'static + Send>> = unsafe { mem::transmute(fut) };

Expand Down

0 comments on commit 6227612

Please sign in to comment.