diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 06a0da456931a..98cae6d4d4b3a 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,6 +9,8 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [dependencies] +bevy_utils = { path = "../bevy_utils", version = "0.8.0-dev" } + futures-lite = "1.4.0" event-listener = "2.5.2" async-executor = "1.3.0" diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 7345d775ee968..1d0afbc001986 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -7,6 +7,9 @@ pub use slice::{ParallelSlice, ParallelSliceMut}; mod task; pub use task::Task; +mod pollable_task; +pub use pollable_task::PollableTask; + #[cfg(not(target_arch = "wasm32"))] mod task_pool; #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/bevy_tasks/src/pollable_task.rs b/crates/bevy_tasks/src/pollable_task.rs new file mode 100644 index 0000000000000..d80e082eee0c9 --- /dev/null +++ b/crates/bevy_tasks/src/pollable_task.rs @@ -0,0 +1,33 @@ +use crate::Task; +use async_channel::{Receiver, TryRecvError}; + +/// A pollable task whose result readiness can be checked in system functions +/// on every frame update without blocking on a future +#[derive(Debug)] +pub struct PollableTask { + receiver: Receiver, + // this is to keep the task alive + _task: Task<()>, +} + +impl PollableTask { + pub(crate) fn new(receiver: Receiver, task: Task<()>) -> Self { + Self { + receiver, + _task: task, + } + } + + /// poll to see whether the task finished + pub fn poll(&self) -> Option { + match self.receiver.try_recv() { + Ok(value) => Some(value), + Err(try_error) => match try_error { + TryRecvError::Empty => None, + TryRecvError::Closed => { + panic!("Polling on the task failed because the connection was already closed.") + } + }, + } + } +} diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 1d0f86e7cb5ed..99bc4631767ea 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,4 +1,6 @@ +use async_channel::bounded; use std::{ + any::type_name, future::Future, mem, pin::Pin, @@ -6,9 +8,10 @@ use std::{ thread::{self, JoinHandle}, }; +use bevy_utils::tracing::error; use futures_lite::{future, pin}; -use crate::Task; +use crate::{PollableTask, Task}; /// Used to create a [`TaskPool`] #[derive(Debug, Default, Clone)] @@ -228,6 +231,28 @@ impl TaskPool { Task::new(self.executor.spawn(future)) } + /// Spawns a static future onto the thread pool. The returned `PollableTask` is not a future, + /// but can be polled in system functions on every frame update without being blocked on + pub fn spawn_pollable(&self, future: F) -> PollableTask + where + F: Future + Send + 'static, + T: Send + Sync + 'static, + { + let (sender, receiver) = bounded(1); + let task = self.spawn(async move { + let result = future.await; + match sender.send(result).await { + Ok(()) => {} + Err(_) => error!( + "Sending result for future {future_name} (`Future`) failed, because the receiving `PollableTask` was dropped", + future_name=type_name::(), + return_name=type_name::(), + ), + } + }); + PollableTask::new(receiver, task) + } + /// Spawns a static future on the thread-local async executor for the current thread. The task /// will run entirely on the thread the task was spawned on. The returned Task is a future. /// It can also be cancelled and "detached" allowing it to continue running without having @@ -301,9 +326,13 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { #[allow(clippy::blacklisted_name)] mod tests { use super::*; - use std::sync::{ - atomic::{AtomicBool, AtomicI32, Ordering}, - Barrier, + use std::{ + ops::Range, + sync::{ + atomic::{AtomicBool, AtomicI32, Ordering}, + Barrier, + }, + time::Duration, }; #[test] @@ -405,7 +434,7 @@ mod tests { scope.spawn_local(async move { inner_count_clone.fetch_add(1, Ordering::Release); if std::thread::current().id() != spawner { - // NOTE: This check is using an atomic rather than simply panicing the + // NOTE: This check is using an atomic rather than simply panicking the // thread to avoid deadlocking the barrier on failure inner_thread_check_failed.store(true, Ordering::Release); } @@ -418,4 +447,31 @@ mod tests { assert!(!thread_check_failed.load(Ordering::Acquire)); assert_eq!(count.load(Ordering::Acquire), 200); } + + #[test] + fn test_spawn_pollable() { + let transform_fn = |i| i + 1; + + let pool = TaskPool::new(); + let nums: Range = 0..10; + + let pollable_tasks = nums + .clone() + .into_iter() + .map(|i| pool.spawn_pollable(async move { transform_fn(i) })) + .collect::>(); + + for _ in 0..100 { + for (pollable_task, number) in pollable_tasks.iter().zip(nums.clone()) { + match pollable_task.poll() { + None => continue, + Some(actual) => assert_eq!(transform_fn(number), actual), + } + return; + } + + std::thread::sleep(Duration::from_secs_f32(1. / 100.)); + } + panic!("Tasks did not finish in time."); + } }