diff --git a/lib/wasix/src/runtime/task_manager/thread.rs b/lib/wasix/src/runtime/task_manager/thread.rs deleted file mode 100644 index f33575657eb..00000000000 --- a/lib/wasix/src/runtime/task_manager/thread.rs +++ /dev/null @@ -1,235 +0,0 @@ -use std::pin::Pin; - -use futures::Future; -#[cfg(feature = "sys-thread")] -use tokio::runtime::{Builder, Runtime}; -use wasmer::{Module, Store}; - -use crate::{WasiCallingId, WasiThreadError}; - -use super::{SpawnType, VirtualTaskManager}; - -#[derive(Debug)] -pub struct ThreadTaskManager { - /// This is the tokio runtime used for ASYNC operations that is - /// used for non-javascript environments - #[cfg(feature = "sys-thread")] - runtime: std::sync::Arc, - pool: Arc, -} - -impl Drop -for ThreadTaskManager { - fn drop(&mut self) { - self.runtime.shutdown_timeout(Duration::from_secs(0)); - } -} - -impl Default for ThreadTaskManager { - #[cfg(feature = "sys-thread")] - fn default() -> Self { - let runtime: std::sync::Arc = - std::sync::Arc::new(Builder::new_current_thread().enable_all().build().unwrap()); - - let concurrency = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(1).unwrap()) - .get(); - let max_threads = 200usize.max(concurrency * 100); - - Self { - runtime, - pool: Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(max_threads) - .build() - .unwrap(), - ), - } - } - - #[cfg(not(feature = "sys-thread"))] - fn default() -> Self { - let (tx, _) = tokio::sync::broadcast::channel(100); - - let concurrency = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::new(1).unwrap()) - .get(); - let max_threads = 200usize.max(concurrency * 100); - - Self { - periodic_wakers: Arc::new(Mutex::new((Vec::new(), tx))), - pool: Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(max_threads) - .build() - .unwrap(), - ), - } - } -} - -#[allow(unused_variables)] -#[cfg(not(feature = "sys-thread"))] -impl VirtualTaskManager for ThreadTaskManager { - /// Invokes whenever a WASM thread goes idle. In some runtimes (like singlethreaded - /// execution environments) they will need to do asynchronous work whenever the main - /// thread goes idle and this is the place to hook for that. - fn sleep_now( - &self, - id: WasiCallingId, - ms: u128, - ) -> Pin + Send + Sync + 'static>> { - if ms == 0 { - std::thread::yield_now(); - } else { - std::thread::sleep(std::time::Duration::from_millis(ms as u64)); - } - Box::pin(async move {}) - } - - /// Starts an asynchronous task that will run on a shared worker pool - /// This task must not block the execution or it could cause a deadlock - fn task_shared( - &self, - task: Box< - dyn FnOnce() -> Pin + Send + 'static>> + Send + 'static, - >, - ) -> Result<(), WasiThreadError> { - Err(WasiThreadError::Unsupported) - } - - /// Starts an asynchronous task on the local thread (by running it in a runtime) - fn block_on(&self, task: Pin>>) { - unimplemented!("asynchronous operations are not supported on this task manager"); - } - - /// Enters the task runtime - fn enter(&self) -> Box { - unimplemented!("asynchronous operations are not supported on this task manager"); - } - - /// Starts an asynchronous task will will run on a dedicated thread - /// pulled from the worker pool that has a stateful thread local variable - /// It is ok for this task to block execution and any async futures within its scope - fn task_wasm( - &self, - task: Box) + Send + 'static>, - store: Store, - module: Module, - spawn_type: SpawnType, - ) -> Result<(), WasiThreadError> { - Err(WasiThreadError::Unsupported) - } - - /// Starts an asynchronous task will will run on a dedicated thread - /// pulled from the worker pool. It is ok for this task to block execution - /// and any async futures within its scope - fn task_dedicated( - &self, - task: Box, - ) -> Result<(), WasiThreadError> { - Err(WasiThreadError::Unsupported) - } - - /// Returns the amount of parallelism that is possible on this platform - fn thread_parallelism(&self) -> Result { - Err(WasiThreadError::Unsupported) - } -} - -#[cfg(feature = "sys-thread")] -impl VirtualTaskManager for ThreadTaskManager { - /// See [`VirtualTaskManager::sleep_now`]. - fn sleep_now( - &self, - _id: WasiCallingId, - ms: u128, - ) -> Pin + Send + Sync + 'static>> { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - self.runtime.spawn(async move { - if time == Duration::ZERO { - tokio::task::yield_now().await; - } else { - tokio::time::sleep(time).await; - } - tx.send(()).ok(); - }); - Box::pin(async move { - rx.recv().await; - }) - } - - /// Starts an asynchronous task will will run on a dedicated thread - /// pulled from the worker pool that has a stateful thread local variable - /// It is ok for this task to block execution and any async futures within its scope - fn task_shared( - &self, - task: Box< - dyn FnOnce() -> Pin + Send + 'static>> + Send + 'static, - >, - ) -> Result<(), WasiThreadError> { - self.runtime.spawn(async move { - let fut = task(); - fut.await - }); - Ok(()) - } - - /// See [`VirtualTaskManager::block_on`]. - fn block_on<'a>(&self, task: Pin + 'a>>) { - let _guard = self.runtime.enter(); - self.runtime.block_on(async move { - task.await; - }); - } - - /// See [`VirtualTaskManager::enter`]. - fn enter<'a>(&'a self) -> Box { - Box::new(self.runtime.enter()) - } - - /// See [`VirtualTaskManager::enter`]. - fn task_wasm( - &self, - task: Box) + Send + 'static>, - store: Store, - module: Module, - spawn_type: SpawnType, - ) -> Result<(), WasiThreadError> { - let vm_memory: Option = match spawn_type { - SpawnType::CreateWithType(mem) => Some( - Memory::new(&mut store, mem.ty) - .map_err(|err| { - tracing::error!("failed to create memory - {}", err); - }) - .unwrap(), - ), - SpawnType::NewThread(mem) => Some(mem), - SpawnType::Create => None, - }; - - self.pool.spawn(move || { - // Invoke the callback - task(store, module, memory); - }); - Ok(()) - } - - /// See [`VirtualTaskManager::task_dedicated`]. - fn task_dedicated( - &self, - task: Box, - ) -> Result<(), WasiThreadError> { - self.pool.spawn(move || { - task(); - }); - Ok(()) - } - - /// See [`VirtualTaskManager::thread_parallelism`]. - fn thread_parallelism(&self) -> Result { - Ok(std::thread::available_parallelism() - .map(|a| usize::from(a)) - .unwrap_or(8)) - } -}