From c8375d38e5c68e06ecf147e1a22bf42496220c48 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 26 Jul 2023 22:27:53 +1000 Subject: [PATCH 1/4] Added a forced shutdown on tokio runtimes as the STDIN blocks the shutdown --- lib/c-api/src/wasm_c_api/wasi/mod.rs | 6 +-- lib/cli/src/commands/run.rs | 6 +-- lib/cli/src/commands/run/wasi.rs | 4 +- lib/wasix/src/os/console/mod.rs | 7 +-- lib/wasix/src/runtime/task_manager/thread.rs | 7 +++ lib/wasix/src/runtime/task_manager/tokio.rs | 56 ++++++++++++++++---- tests/lib/wast/src/wasi_wast.rs | 2 +- 7 files changed, 66 insertions(+), 22 deletions(-) diff --git a/lib/c-api/src/wasm_c_api/wasi/mod.rs b/lib/c-api/src/wasm_c_api/wasi/mod.rs index b08bf870fb6..7e8c5008d51 100644 --- a/lib/c-api/src/wasm_c_api/wasi/mod.rs +++ b/lib/c-api/src/wasm_c_api/wasi/mod.rs @@ -275,7 +275,7 @@ fn prepare_webc_env( let handle = runtime.handle().clone(); let _guard = handle.enter(); - let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(handle))); + let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(runtime))); rt.set_engine(Some(store_mut.engine().clone())); let slice = unsafe { std::slice::from_raw_parts(bytes, len) }; @@ -324,7 +324,6 @@ pub struct wasi_env_t { /// cbindgen:ignore pub(super) inner: WasiFunctionEnv, pub(super) store: StoreRef, - pub(super) _runtime: tokio::runtime::Runtime, } /// Create a new WASI environment. @@ -349,7 +348,7 @@ pub unsafe extern "C" fn wasi_env_new( let handle = runtime.handle().clone(); let _guard = handle.enter(); - let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(handle))); + let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(runtime))); rt.set_engine(Some(store_mut.engine().clone())); if !config.inherit_stdout { @@ -370,7 +369,6 @@ pub unsafe extern "C" fn wasi_env_new( Some(Box::new(wasi_env_t { inner: env, store: store.clone(), - _runtime: runtime, })) } diff --git a/lib/cli/src/commands/run.rs b/lib/cli/src/commands/run.rs index 355eedf0b95..e1f0e4ec503 100644 --- a/lib/cli/src/commands/run.rs +++ b/lib/cli/src/commands/run.rs @@ -105,9 +105,9 @@ impl Run { let _guard = handle.enter(); let (store, _) = self.store.get_store()?; - let runtime = - self.wasi - .prepare_runtime(store.engine().clone(), &self.env, handle.clone())?; + let runtime = self + .wasi + .prepare_runtime(store.engine().clone(), &self.env, runtime)?; // This is a slow operation, so let's temporarily wrap the runtime with // something that displays progress diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index 3a887b584d1..5f74b6ddaff 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -251,9 +251,9 @@ impl Wasi { &self, engine: Engine, env: &WasmerEnv, - handle: Handle, + runtime: tokio::runtime::Runtime, ) -> Result { - let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(handle))); + let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(runtime))); if self.networking { rt.set_networking_implementation(virtual_net::host::LocalNetworking::default()); diff --git a/lib/wasix/src/os/console/mod.rs b/lib/wasix/src/os/console/mod.rs index 3700fcd71ea..66e19cfa88a 100644 --- a/lib/wasix/src/os/console/mod.rs +++ b/lib/wasix/src/os/console/mod.rs @@ -292,9 +292,10 @@ mod tests { #[cfg_attr(not(feature = "host-reqwest"), ignore = "Requires a HTTP client")] fn test_console_dash_tty_with_args_and_env() { let tokio_rt = tokio::runtime::Runtime::new().unwrap(); - let _guard = tokio_rt.handle().enter(); + let rt_handle = tokio_rt.handle().clone(); + let _guard = rt_handle.enter(); - let tm = TokioTaskManager::new(tokio_rt.handle().clone()); + let tm = TokioTaskManager::new(tokio_rt); let mut rt = PluggableRuntime::new(Arc::new(tm)); rt.set_engine(Some(wasmer::Engine::default())) .set_package_loader(BuiltinPackageLoader::from_env().unwrap()); @@ -316,7 +317,7 @@ mod tests { .run() .unwrap(); - let code = tokio_rt + let code = rt_handle .block_on(async move { virtual_fs::AsyncWriteExt::write_all( &mut stdin_tx, diff --git a/lib/wasix/src/runtime/task_manager/thread.rs b/lib/wasix/src/runtime/task_manager/thread.rs index b330850b9c6..f33575657eb 100644 --- a/lib/wasix/src/runtime/task_manager/thread.rs +++ b/lib/wasix/src/runtime/task_manager/thread.rs @@ -18,6 +18,13 @@ pub struct ThreadTaskManager { pool: Arc, } +impl Drop +for ThreadTaskManager { + fn drop(&mut self) { + self.runtime.shutdown_timeout(Duration::from_secs(0)); + } +} + impl Default for ThreadTaskManager { #[cfg(feature = "sys-thread")] fn default() -> Self { diff --git a/lib/wasix/src/runtime/task_manager/tokio.rs b/lib/wasix/src/runtime/task_manager/tokio.rs index d7cad4789c6..89bae085de5 100644 --- a/lib/wasix/src/runtime/task_manager/tokio.rs +++ b/lib/wasix/src/runtime/task_manager/tokio.rs @@ -1,28 +1,66 @@ use std::{num::NonZeroUsize, pin::Pin, sync::Arc, time::Duration}; +use std::sync::Mutex; use futures::{future::BoxFuture, Future}; -use tokio::runtime::Handle; +use tokio::runtime::{Handle, Runtime}; use crate::{os::task::thread::WasiThreadError, WasiFunctionEnv}; use super::{TaskWasm, TaskWasmRunProperties, VirtualTaskManager}; +#[derive(Debug, Clone)] +pub enum RuntimeOrHandle { + Handle(Handle), + Runtime(Handle, Arc>>), +} +impl From +for RuntimeOrHandle { + fn from(value: Handle) -> Self { + Self::Handle(value) + } +} +impl From +for RuntimeOrHandle { + fn from(value: Runtime) -> Self { + Self::Runtime(value.handle().clone(), Arc::new(Mutex::new(Some(value)))) + } +} + +impl Drop +for RuntimeOrHandle { + fn drop(&mut self) { + if let Self::Runtime(_, runtime) = self { + runtime.lock().unwrap().take().map(|h| h.shutdown_timeout(Duration::from_secs(0))); + } + } +} + +impl RuntimeOrHandle { + pub fn handle(&self) -> &Handle { + match self { + Self::Handle(h) => h, + Self::Runtime(h, _) => h + } + } +} + /// A task manager that uses tokio to spawn tasks. #[derive(Clone, Debug)] pub struct TokioTaskManager { - handle: Handle, + rt: RuntimeOrHandle, pool: Arc, } impl TokioTaskManager { - pub fn new(rt: Handle) -> Self { + pub fn new(rt: I) -> Self + where I: Into { let concurrency = std::thread::available_parallelism() .unwrap_or(NonZeroUsize::new(1).unwrap()) .get(); let max_threads = 200usize.max(concurrency * 100); Self { - handle: rt, + rt: rt.into(), pool: Arc::new( rayon::ThreadPoolBuilder::new() .num_threads(max_threads) @@ -33,13 +71,13 @@ impl TokioTaskManager { } pub fn runtime_handle(&self) -> tokio::runtime::Handle { - self.handle.clone() + self.rt.handle().clone() } } impl Default for TokioTaskManager { fn default() -> Self { - Self::new(Handle::current()) + Self::new(Handle::current().clone()) } } @@ -55,7 +93,7 @@ impl VirtualTaskManager for TokioTaskManager { /// See [`VirtualTaskManager::sleep_now`]. fn sleep_now(&self, time: Duration) -> Pin + Send + Sync>> { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - self.handle.spawn(async move { + self.rt.handle().spawn(async move { if time == Duration::ZERO { tokio::task::yield_now().await; } else { @@ -73,7 +111,7 @@ impl VirtualTaskManager for TokioTaskManager { &self, task: Box BoxFuture<'static, ()> + Send + 'static>, ) -> Result<(), WasiThreadError> { - self.handle.spawn(async move { + self.rt.handle().spawn(async move { let fut = task(); fut.await }); @@ -99,7 +137,7 @@ impl VirtualTaskManager for TokioTaskManager { let trigger = trigger(); let pool = self.pool.clone(); - self.handle.spawn(async move { + self.rt.handle().spawn(async move { let result = trigger.await; // Build the task that will go on the callback pool.spawn(move || { diff --git a/tests/lib/wast/src/wasi_wast.rs b/tests/lib/wast/src/wasi_wast.rs index e5d9d61f1c0..8354ea775da 100644 --- a/tests/lib/wast/src/wasi_wast.rs +++ b/tests/lib/wast/src/wasi_wast.rs @@ -106,7 +106,7 @@ impl<'a> WasiTest<'a> { #[cfg(not(target_arch = "wasm32"))] let _guard = handle.enter(); #[cfg(not(target_arch = "wasm32"))] - let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(handle))); + let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(runtime))); #[cfg(target_arch = "wasm32")] let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::default())); rt.set_engine(Some(store.engine().clone())); From 218e7d85ec1ebafd6cda679e42058749c29682e9 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 26 Jul 2023 22:35:54 +1000 Subject: [PATCH 2/4] Linting fixes --- lib/wasix/src/runtime/task_manager/tokio.rs | 23 ++++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/wasix/src/runtime/task_manager/tokio.rs b/lib/wasix/src/runtime/task_manager/tokio.rs index 89bae085de5..aff3e1e07e8 100644 --- a/lib/wasix/src/runtime/task_manager/tokio.rs +++ b/lib/wasix/src/runtime/task_manager/tokio.rs @@ -1,5 +1,5 @@ -use std::{num::NonZeroUsize, pin::Pin, sync::Arc, time::Duration}; use std::sync::Mutex; +use std::{num::NonZeroUsize, pin::Pin, sync::Arc, time::Duration}; use futures::{future::BoxFuture, Future}; use tokio::runtime::{Handle, Runtime}; @@ -13,24 +13,25 @@ pub enum RuntimeOrHandle { Handle(Handle), Runtime(Handle, Arc>>), } -impl From -for RuntimeOrHandle { +impl From for RuntimeOrHandle { fn from(value: Handle) -> Self { Self::Handle(value) } } -impl From -for RuntimeOrHandle { +impl From for RuntimeOrHandle { fn from(value: Runtime) -> Self { Self::Runtime(value.handle().clone(), Arc::new(Mutex::new(Some(value)))) } } -impl Drop -for RuntimeOrHandle { +impl Drop for RuntimeOrHandle { fn drop(&mut self) { if let Self::Runtime(_, runtime) = self { - runtime.lock().unwrap().take().map(|h| h.shutdown_timeout(Duration::from_secs(0))); + runtime + .lock() + .unwrap() + .take() + .map(|h| h.shutdown_timeout(Duration::from_secs(0))); } } } @@ -39,7 +40,7 @@ impl RuntimeOrHandle { pub fn handle(&self) -> &Handle { match self { Self::Handle(h) => h, - Self::Runtime(h, _) => h + Self::Runtime(h, _) => h, } } } @@ -53,7 +54,9 @@ pub struct TokioTaskManager { impl TokioTaskManager { pub fn new(rt: I) -> Self - where I: Into { + where + I: Into, + { let concurrency = std::thread::available_parallelism() .unwrap_or(NonZeroUsize::new(1).unwrap()) .get(); From 405a4e6a3ee479622f79b65c2cd88d6a65ba9205 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 26 Jul 2023 22:46:23 +1000 Subject: [PATCH 3/4] More lint fixes and a compilation fix for c_api --- lib/c-api/src/wasm_c_api/wasi/mod.rs | 7 +++---- lib/wasix/src/runtime/task_manager/tokio.rs | 10 ++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/c-api/src/wasm_c_api/wasi/mod.rs b/lib/c-api/src/wasm_c_api/wasi/mod.rs index 7e8c5008d51..508293dcb93 100644 --- a/lib/c-api/src/wasm_c_api/wasi/mod.rs +++ b/lib/c-api/src/wasm_c_api/wasi/mod.rs @@ -233,7 +233,7 @@ unsafe fn wasi_env_with_filesystem_inner( let module = &module.as_ref()?.inner; let imports = imports?; - let (wasi_env, import_object, runtime) = prepare_webc_env( + let (wasi_env, import_object) = prepare_webc_env( config, &mut store.store_mut(), module, @@ -247,7 +247,6 @@ unsafe fn wasi_env_with_filesystem_inner( Some(Box::new(wasi_env_t { inner: wasi_env, store: store.clone(), - _runtime: runtime, })) } @@ -259,7 +258,7 @@ fn prepare_webc_env( bytes: &'static u8, len: usize, package_name: &str, -) -> Option<(WasiFunctionEnv, Imports, tokio::runtime::Runtime)> { +) -> Option<(WasiFunctionEnv, Imports)> { use virtual_fs::static_fs::StaticFileSystem; use webc::v1::{FsEntryType, WebC}; @@ -316,7 +315,7 @@ fn prepare_webc_env( let env = builder.finalize(store).ok()?; let import_object = env.import_object(store, module).ok()?; - Some((env, import_object, runtime)) + Some((env, import_object)) } #[allow(non_camel_case_types)] diff --git a/lib/wasix/src/runtime/task_manager/tokio.rs b/lib/wasix/src/runtime/task_manager/tokio.rs index aff3e1e07e8..f7b0dc1d5d5 100644 --- a/lib/wasix/src/runtime/task_manager/tokio.rs +++ b/lib/wasix/src/runtime/task_manager/tokio.rs @@ -27,11 +27,9 @@ impl From for RuntimeOrHandle { impl Drop for RuntimeOrHandle { fn drop(&mut self) { if let Self::Runtime(_, runtime) = self { - runtime - .lock() - .unwrap() - .take() - .map(|h| h.shutdown_timeout(Duration::from_secs(0))); + if let Some(h) = runtime.lock().unwrap().take() { + h.shutdown_timeout(Duration::from_secs(0)) + } } } } @@ -80,7 +78,7 @@ impl TokioTaskManager { impl Default for TokioTaskManager { fn default() -> Self { - Self::new(Handle::current().clone()) + Self::new(Handle::current()) } } From b31fc2e7b709fa5f5c4cd8ca07f3c33174b0cc29 Mon Sep 17 00:00:00 2001 From: Johnathan Sharratt Date: Wed, 26 Jul 2023 23:02:40 +1000 Subject: [PATCH 4/4] Made prepare_runtime backwards compatible --- lib/cli/src/commands/run/wasi.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index 5f74b6ddaff..dc8c13e7e02 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -28,7 +28,10 @@ use wasmer_wasix::{ FileSystemSource, InMemorySource, MultiSource, PackageSpecifier, Source, WapmSource, WebSource, }, - task_manager::{tokio::TokioTaskManager, VirtualTaskManagerExt}, + task_manager::{ + tokio::{RuntimeOrHandle, TokioTaskManager}, + VirtualTaskManagerExt, + }, }, types::__WASI_STDIN_FILENO, wasmer_wasix_types::wasi::Errno, @@ -247,13 +250,16 @@ impl Wasi { caps } - pub fn prepare_runtime( + pub fn prepare_runtime( &self, engine: Engine, env: &WasmerEnv, - runtime: tokio::runtime::Runtime, - ) -> Result { - let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(runtime))); + rt_or_handle: I, + ) -> Result + where + I: Into, + { + let mut rt = PluggableRuntime::new(Arc::new(TokioTaskManager::new(rt_or_handle.into()))); if self.networking { rt.set_networking_implementation(virtual_net::host::LocalNetworking::default());