diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index c517ae245abed..47890a3adc50e 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,12 +1,11 @@ use std::{ future::Future, mem, - pin::Pin, sync::Arc, thread::{self, JoinHandle}, }; -use futures_lite::{future, pin, FutureExt}; +use futures_lite::{future, FutureExt}; use crate::{Task, TaskGroup}; @@ -303,8 +302,13 @@ impl TaskPool { let shutdown_rx = shutdown_rx.clone(); make_thread_builder(&builder, "Compute", i) .spawn(move || { + let future = async { + loop { + compute.tick().await; + } + }; // Use unwrap_err because we expect a Closed error - future::block_on(compute.run(shutdown_rx.recv())).unwrap_err(); + future::block_on(shutdown_rx.recv().or(future)).unwrap_err(); }) .expect("Failed to spawn thread.") }) @@ -322,7 +326,7 @@ impl TaskPool { } }; // Use unwrap_err because we expect a Closed error - future::block_on(future.or(shutdown_rx.recv())).unwrap_err(); + future::block_on(shutdown_rx.recv().or(future)).unwrap_err(); }) .expect("Failed to spawn thread.") }) @@ -341,7 +345,7 @@ impl TaskPool { } }; // Use unwrap_err because we expect a Closed error - future::block_on(future.or(shutdown_rx.recv())).unwrap_err(); + future::block_on(shutdown_rx.recv().or(future)).unwrap_err(); }) .expect("Failed to spawn thread.") }) @@ -436,12 +440,8 @@ impl TaskPool { f(&mut scope); - if scope.spawned.is_empty() { - Vec::default() - } else if scope.spawned.len() == 1 { - vec![future::block_on(&mut scope.spawned[0])] - } else { - let fut = async move { + future::block_on(async move { + let get_results = async move { let mut results = Vec::with_capacity(scope.spawned.len()); for task in scope.spawned { results.push(task.await); @@ -450,32 +450,14 @@ impl TaskPool { results }; - // Pin the futures on the stack. - pin!(fut); - - // SAFETY: This function blocks until all futures complete, so we do not read/write - // the data from futures outside of the 'scope lifetime. However, - // rust has no way of knowing this so we must convert to 'static - // here to appease the compiler as it is unable to validate safety. - let fut: Pin<&mut (dyn Future>)> = fut; - let fut: Pin<&'static mut (dyn Future> + 'static)> = - unsafe { mem::transmute(fut) }; - - // The thread that calls scope() will participate in driving tasks in the pool - // forward until the tasks that are spawned by this scope() call - // complete. (If the caller of scope() happens to be a thread in - // this thread pool, and we only have one thread in the pool, then - // simply calling future::block_on(spawned) would deadlock.) - let mut spawned = local_executor.spawn(fut); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { - break result; - }; - - executor.try_tick(); - local_executor.try_tick(); - } - } + let tick_forever = async move { + loop { + local_executor.tick().or(executor.tick()).await; + } + }; + + get_results.or(tick_forever).await + }) }) }