Skip to content

Commit

Permalink
change scope to take a thread executor
Browse files Browse the repository at this point in the history
  • Loading branch information
hymm committed Nov 8, 2022
1 parent d0a6615 commit 7186d73
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 110 deletions.
15 changes: 11 additions & 4 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bevy_ecs::{
event::{Event, Events},
prelude::FromWorld,
schedule::{
IntoSystemDescriptor, Schedule, ShouldRun, Stage, StageLabel, State, StateData, SystemSet,
SystemStage,
IntoSystemDescriptor, MainThreadExecutor, Schedule, ShouldRun, Stage, StageLabel, State,
StateData, SystemSet, SystemStage,
},
system::Resource,
world::World,
Expand Down Expand Up @@ -153,7 +153,11 @@ impl App {
pub fn update(&mut self) {
#[cfg(feature = "trace")]
let _bevy_frame_update_span = info_span!("frame").entered();
ComputeTaskPool::init(TaskPool::default).scope(|scope| {
let thread_executor = self
.world
.get_resource::<MainThreadExecutor>()
.map(|e| e.0.clone());
ComputeTaskPool::init(TaskPool::default).scope(thread_executor, |scope| {
if self.run_once {
for sub_app in self.sub_apps.values_mut() {
(sub_app.extract)(&mut self.world, &mut sub_app.app);
Expand Down Expand Up @@ -1001,10 +1005,13 @@ impl App {
pub fn add_sub_app(
&mut self,
label: impl AppLabel,
app: App,
mut app: App,
sub_app_extract: impl Fn(&mut World, &mut App) + 'static + Send + Sync,
sub_app_runner: impl Fn(&mut App) + 'static + Send + Sync,
) -> &mut Self {
if let Some(executor) = self.world.get_resource::<MainThreadExecutor>() {
app.world.insert_resource(executor.clone());
}
self.sub_apps.insert(
label.as_label(),
SubApp {
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod name;
mod serde;
mod task_pool_options;

use bevy_ecs::schedule::MainThreadExecutor;
use bevy_ecs::system::Resource;
pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable};
pub use name::*;
Expand Down Expand Up @@ -40,6 +41,7 @@ impl Plugin for CorePlugin {
fn build(&self, app: &mut App) {
// Setup the default bevy task pools
self.task_pool_options.create_default_pools();
app.insert_resource(MainThreadExecutor::new());

#[cfg(not(target_arch = "wasm32"))]
app.add_system_to_stage(
Expand Down
8 changes: 1 addition & 7 deletions crates/bevy_core/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use bevy_ecs::prelude::Resource;
use bevy_tasks::{
AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, MainThreadExecutor, TaskPoolBuilder,
};
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool, TaskPoolBuilder};
use bevy_utils::tracing::trace;

/// Defines a simple way to determine how many threads to use given the number of remaining cores
Expand Down Expand Up @@ -151,9 +149,5 @@ impl TaskPoolOptions {
.build()
});
}

{
MainThreadExecutor::init();
}
}
}
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ impl<Q: WorldQuery, F: ReadOnlyWorldQuery> QueryState<Q, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
ComputeTaskPool::get().scope(|scope| {
ComputeTaskPool::get().scope(None, |scope| {
if Q::IS_DENSE && F::IS_DENSE {
let tables = &world.storages().tables;
for table_id in &self.matched_table_ids {
Expand Down
32 changes: 28 additions & 4 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::sync::Arc;

use crate as bevy_ecs;
use crate::{
archetype::ArchetypeComponentId,
query::Access,
schedule::{ParallelSystemExecutor, SystemContainer},
system::Resource,
world::World,
};
use async_channel::{Receiver, Sender};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool};
use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor};
#[cfg(feature = "trace")]
use bevy_utils::tracing::Instrument;
use event_listener::Event;
Expand All @@ -14,6 +18,22 @@ use fixedbitset::FixedBitSet;
#[cfg(test)]
use scheduling_event::*;

///
#[derive(Resource, Default)]
pub struct MainThreadExecutor(pub Arc<ThreadExecutor>);

impl MainThreadExecutor {
pub fn new() -> Self {
MainThreadExecutor(Arc::new(ThreadExecutor::new()))
}
}

impl Clone for MainThreadExecutor {
fn clone(&self) -> Self {
MainThreadExecutor(self.0.clone())
}
}

struct SystemSchedulingMetadata {
/// Used to signal the system's task to start the system.
start: Event,
Expand Down Expand Up @@ -124,7 +144,11 @@ impl ParallelSystemExecutor for ParallelExecutor {
}
}

ComputeTaskPool::init(TaskPool::default).scope(|scope| {
let thread_executor = world
.get_resource::<MainThreadExecutor>()
.map(|e| e.0.clone());

ComputeTaskPool::init(TaskPool::default).scope(thread_executor, |scope| {
self.prepare_systems(scope, systems, world);
if self.should_run.count_ones(..) == 0 {
return;
Expand Down Expand Up @@ -236,7 +260,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_on_main(task);
scope.spawn_on_scope(task);
}

#[cfg(test)]
Expand Down Expand Up @@ -271,7 +295,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_on_main(task);
scope.spawn_on_scope(task);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_gltf/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ async fn load_gltf<'a, 'b>(
} else {
#[cfg(not(target_arch = "wasm32"))]
IoTaskPool::get()
.scope(|scope| {
.scope(None, |scope| {
gltf.textures().for_each(|gltf_texture| {
let linear_textures = &linear_textures;
let load_context: &LoadContext = load_context;
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/examples/busy_behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn main() {
.build();

let t0 = instant::Instant::now();
pool.scope(|s| {
pool.scope(None, |s| {
for i in 0..40 {
s.spawn(async move {
let now = instant::Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/examples/idle_behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() {
.thread_name("Idle Behavior ThreadPool".to_string())
.build();

pool.scope(|s| {
pool.scope(None, |s| {
for i in 0..1 {
s.spawn(async move {
println!("Blocking for 10 seconds");
Expand Down
32 changes: 16 additions & 16 deletions crates/bevy_tasks/src/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ where
///
/// See [`Iterator::count()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.count)
fn count(mut self, pool: &TaskPool) -> usize {
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.count() });
}
Expand Down Expand Up @@ -105,7 +105,7 @@ where
where
F: FnMut(BatchIter::Item) + Send + Clone + Sync,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move {
Expand Down Expand Up @@ -195,7 +195,7 @@ where
C: std::iter::FromIterator<BatchIter::Item>,
BatchIter::Item: Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.collect::<Vec<_>>() });
}
Expand All @@ -216,7 +216,7 @@ where
BatchIter::Item: Send + 'static,
{
let (mut a, mut b) = <(C, C)>::default();
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.partition::<Vec<_>, F>(newf) });
Expand All @@ -242,7 +242,7 @@ where
F: FnMut(C, BatchIter::Item) -> C + Send + Sync + Clone,
C: Clone + Send + Sync + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
let newi = init.clone();
Expand All @@ -260,7 +260,7 @@ where
where
F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(mut batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.all(newf) });
Expand All @@ -279,7 +279,7 @@ where
where
F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(mut batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.any(newf) });
Expand All @@ -299,7 +299,7 @@ where
where
F: FnMut(BatchIter::Item) -> bool + Send + Sync + Clone,
{
let poses = pool.scope(|s| {
let poses = pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let mut newf = f.clone();
s.spawn(async move {
Expand Down Expand Up @@ -332,7 +332,7 @@ where
where
BatchIter::Item: Ord + Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.max() });
}
Expand All @@ -349,7 +349,7 @@ where
where
BatchIter::Item: Ord + Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.min() });
}
Expand All @@ -368,7 +368,7 @@ where
F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone,
BatchIter::Item: Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.max_by_key(newf) });
Expand All @@ -388,7 +388,7 @@ where
F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone,
BatchIter::Item: Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.max_by(newf) });
Expand All @@ -408,7 +408,7 @@ where
F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone,
BatchIter::Item: Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.min_by_key(newf) });
Expand All @@ -428,7 +428,7 @@ where
F: FnMut(&BatchIter::Item, &BatchIter::Item) -> std::cmp::Ordering + Send + Sync + Clone,
BatchIter::Item: Send + 'static,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
let newf = f.clone();
s.spawn(async move { batch.min_by(newf) });
Expand Down Expand Up @@ -482,7 +482,7 @@ where
S: std::iter::Sum<BatchIter::Item> + Send + 'static,
R: std::iter::Sum<S>,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.sum() });
}
Expand All @@ -499,7 +499,7 @@ where
S: std::iter::Product<BatchIter::Item> + Send + 'static,
R: std::iter::Product<S>,
{
pool.scope(|s| {
pool.scope(None, |s| {
while let Some(batch) = self.next_batch() {
s.spawn(async move { batch.product() });
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use usages::tick_global_task_pools_on_main_thread;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

mod main_thread_executor;
pub use main_thread_executor::MainThreadExecutor;
pub use main_thread_executor::ThreadExecutor;

mod iter;
pub use iter::ParallelIterator;
Expand Down
Loading

0 comments on commit 7186d73

Please sign in to comment.