diff --git a/Cargo.lock b/Cargo.lock index d8e17cef1c7..006149241c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5726,6 +5726,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "cooked-waker", + "dashmap", "derivative", "futures", "getrandom", diff --git a/lib/api/src/externals/function.rs b/lib/api/src/externals/function.rs index 734a3f8a866..48ac7eff2a8 100644 --- a/lib/api/src/externals/function.rs +++ b/lib/api/src/externals/function.rs @@ -60,7 +60,7 @@ mod private { //! Sealing the HostFunctionKind because it shouldn't be implemented //! by any type outside. //! See: - //! https://rust-lang.github.io/api-guidelines/future-proofing.html#c-sealed + //! pub trait HostFunctionKindSealed {} impl HostFunctionKindSealed for super::WithEnv {} impl HostFunctionKindSealed for super::WithoutEnv {} diff --git a/lib/cli/src/commands/run/wasi.rs b/lib/cli/src/commands/run/wasi.rs index 6a44a0a81ea..786a02978ed 100644 --- a/lib/cli/src/commands/run/wasi.rs +++ b/lib/cli/src/commands/run/wasi.rs @@ -14,6 +14,7 @@ use wasmer_wasix::{ os::{tty_sys::SysTty, TtyBridge}, runners::MappedDirectory, runtime::{ + module_cache::{FileSystemCache, ModuleCache}, resolver::{PackageResolver, RegistryResolver}, task_manager::tokio::TokioTaskManager, }, @@ -220,12 +221,17 @@ impl Wasi { rt.set_tty(tty); } - rt.set_engine(Some(engine)); + let wasmer_home = WasmerConfig::get_wasmer_dir().map_err(anyhow::Error::msg)?; let resolver = self - .prepare_resolver() + .prepare_resolver(&wasmer_home) .context("Unable to prepare the package resolver")?; - rt.set_resolver(resolver); + let module_cache = wasmer_wasix::runtime::module_cache::in_memory() + .and_then(FileSystemCache::new(wasmer_home.join("compiled"))); + + rt.set_resolver(resolver) + .set_module_cache(module_cache) + .set_engine(Some(engine)); Ok(rt) } @@ -273,8 +279,8 @@ impl Wasi { }) } - fn prepare_resolver(&self) -> Result { - let mut resolver = wapm_resolver()?; + fn prepare_resolver(&self, wasmer_home: &Path) -> Result { + let mut resolver = wapm_resolver(wasmer_home)?; for path in &self.include_webcs { let pkg = preload_webc(path) @@ -286,14 +292,13 @@ impl Wasi { } } -fn wapm_resolver() -> Result { +fn wapm_resolver(wasmer_home: &Path) -> Result { // FIXME(Michael-F-Bryan): Ideally, all of this would in the // RegistryResolver::from_env() constructor, but we don't want to add // wasmer-registry as a dependency of wasmer-wasix just yet. - let wasmer_home = WasmerConfig::get_wasmer_dir().map_err(anyhow::Error::msg)?; - let cache_dir = wasmer_registry::get_webc_dir(&wasmer_home); + let cache_dir = wasmer_registry::get_webc_dir(wasmer_home); let config = - wasmer_registry::WasmerConfig::from_file(&wasmer_home).map_err(anyhow::Error::msg)?; + wasmer_registry::WasmerConfig::from_file(wasmer_home).map_err(anyhow::Error::msg)?; let registry = config.registry.get_graphql_url(); let registry = registry diff --git a/lib/cli/src/commands/run_unstable.rs b/lib/cli/src/commands/run_unstable.rs index 0a545a9154e..e5cc1df9c84 100644 --- a/lib/cli/src/commands/run_unstable.rs +++ b/lib/cli/src/commands/run_unstable.rs @@ -454,17 +454,24 @@ impl TargetOnDisk { let leading_bytes = &buffer[..bytes_read]; if wasmer::is_wasm(leading_bytes) { - Ok(TargetOnDisk::WebAssemblyBinary(path)) - } else if webc::detect(leading_bytes).is_ok() { - Ok(TargetOnDisk::Webc(path)) - } else if path.extension() == Some("wat".as_ref()) { - Ok(TargetOnDisk::Wat(path)) - } else { - #[cfg(feature = "compiler")] - if ArtifactBuild::is_deserializable(leading_bytes) { - return Ok(TargetOnDisk::Artifact(path)); - } - anyhow::bail!("Unable to determine how to execute \"{}\"", path.display()); + return Ok(TargetOnDisk::WebAssemblyBinary(path)); + } + + if webc::detect(leading_bytes).is_ok() { + return Ok(TargetOnDisk::Webc(path)); + } + + #[cfg(feature = "compiler")] + if ArtifactBuild::is_deserializable(leading_bytes) { + return Ok(TargetOnDisk::Artifact(path)); + } + + // If we can't figure out the file type based on its content, fall back + // to checking the extension. + + match path.extension().and_then(|s| s.to_str()) { + Some("wat") => Ok(TargetOnDisk::Wat(path)), + _ => anyhow::bail!("Unable to determine how to execute \"{}\"", path.display()), } } diff --git a/lib/compiler/src/engine/inner.rs b/lib/compiler/src/engine/inner.rs index 3b89c1c7662..5424966fdd1 100644 --- a/lib/compiler/src/engine/inner.rs +++ b/lib/compiler/src/engine/inner.rs @@ -220,7 +220,7 @@ impl Engine { /// /// # Safety /// - /// See [`crate::Module::deserialize_from_file`]. + /// See [`Artifact::deserialize`]. #[cfg(not(target_arch = "wasm32"))] pub unsafe fn deserialize_from_file( &self, diff --git a/lib/types/src/serialize.rs b/lib/types/src/serialize.rs index 2c294dc2718..4528b9b1c2a 100644 --- a/lib/types/src/serialize.rs +++ b/lib/types/src/serialize.rs @@ -207,7 +207,7 @@ pub struct MetadataHeader { impl MetadataHeader { /// Current ABI version. Increment this any time breaking changes are made /// to the format of the serialized data. - const CURRENT_VERSION: u32 = 4; + pub const CURRENT_VERSION: u32 = 4; /// Magic number to identify wasmer metadata. const MAGIC: [u8; 8] = *b"WASMER\0\0"; diff --git a/lib/types/src/stack/frame.rs b/lib/types/src/stack/frame.rs index e6cb53062a4..989b0fc86ea 100644 --- a/lib/types/src/stack/frame.rs +++ b/lib/types/src/stack/frame.rs @@ -1,13 +1,9 @@ use crate::SourceLoc; -/// Description of a frame in a backtrace for a [`RuntimeError`](crate::RuntimeError). +/// Description of a frame in a backtrace. /// -/// Whenever a WebAssembly trap occurs an instance of [`RuntimeError`] -/// is created. Each [`RuntimeError`] has a backtrace of the -/// WebAssembly frames that led to the trap, and each frame is -/// described by this structure. -/// -/// [`RuntimeError`]: crate::RuntimeError +/// Each runtime error includes a backtrace of the WebAssembly frames that led +/// to the trap, and each frame is described by this structure. #[derive(Debug, Clone)] pub struct FrameInfo { /// The name of the module diff --git a/lib/vm/src/instance/mod.rs b/lib/vm/src/instance/mod.rs index 0909e6927f5..52b0a9e0f00 100644 --- a/lib/vm/src/instance/mod.rs +++ b/lib/vm/src/instance/mod.rs @@ -989,7 +989,7 @@ impl Drop for VMInstance { } impl VMInstance { - /// Create a new `VMInstance` pointing at a new [`InstanceRef`]. + /// Create a new `VMInstance` pointing at a new [`Instance`]. /// /// # Safety /// diff --git a/lib/vm/src/vmcontext.rs b/lib/vm/src/vmcontext.rs index f87df89c471..9e3fa240114 100644 --- a/lib/vm/src/vmcontext.rs +++ b/lib/vm/src/vmcontext.rs @@ -272,7 +272,7 @@ pub struct VMGlobalImport { /// # Safety /// This data is safe to share between threads because it's plain data that /// is the user's responsibility to synchronize. Additionally, all operations -/// on `from` are thread-safe through the use of a mutex in [`Global`]. +/// on `from` are thread-safe through the use of a mutex in [`VMGlobal`]. unsafe impl Send for VMGlobalImport {} /// # Safety /// This data is safe to share between threads because it's plain data that diff --git a/lib/wasi/Cargo.toml b/lib/wasi/Cargo.toml index c82aadfafcf..1142bffc2b5 100644 --- a/lib/wasi/Cargo.toml +++ b/lib/wasi/Cargo.toml @@ -53,6 +53,9 @@ wai-bindgen-wasmer = { path = "../wai-bindgen-wasmer", version = "0.4.0", featur heapless = "0.7.16" once_cell = "1.17.0" pin-project = "1.0.12" +semver = "1.0.17" +dashmap = "5.4.0" +tempfile = "3.4.0" # Used by the WCGI runner hyper = { version = "0.14", features = ["server", "stream"], optional = true } wcgi = { version = "0.1.2", optional = true } @@ -60,7 +63,6 @@ wcgi-host = { version = "0.1.2", optional = true } tower-http = { version = "0.4.0", features = ["trace", "util", "catch-panic", "cors"], optional = true } tower = { version = "0.4.13", features = ["make", "util"], optional = true } url = "2.3.1" -semver = "1.0.17" [target.'cfg(not(target_arch = "riscv64"))'.dependencies.reqwest] version = "0.11" @@ -89,7 +91,6 @@ wasm-bindgen = "0.2.74" [dev-dependencies] wasmer = { path = "../api", version = "=3.3.0", default-features = false, features = ["wat", "js-serializable-module"] } tokio = { version = "1", features = [ "sync", "macros", "rt" ], default_features = false } -tempfile = "3.4.0" [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = "0.3.0" diff --git a/lib/wasi/src/bin_factory/binary_package.rs b/lib/wasi/src/bin_factory/binary_package.rs index 19f32738d47..8a3e05b7cc9 100644 --- a/lib/wasi/src/bin_factory/binary_package.rs +++ b/lib/wasi/src/bin_factory/binary_package.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use derivative::*; use once_cell::sync::OnceCell; @@ -6,7 +6,7 @@ use semver::Version; use virtual_fs::FileSystem; use webc::compat::SharedBytes; -use super::hash_of_binary; +use crate::runtime::module_cache::ModuleHash; #[derive(Derivative, Clone)] #[derivative(Debug)] @@ -14,7 +14,7 @@ pub struct BinaryPackageCommand { name: String, #[derivative(Debug = "ignore")] pub(crate) atom: SharedBytes, - hash: OnceCell, + hash: OnceCell, } impl BinaryPackageCommand { @@ -38,15 +38,16 @@ impl BinaryPackageCommand { &self.atom } - pub fn hash(&self) -> &str { - self.hash.get_or_init(|| hash_of_binary(self.atom())) + pub fn hash(&self) -> &ModuleHash { + self.hash.get_or_init(|| ModuleHash::sha256(self.atom())) } } /// A WebAssembly package that has been loaded into memory. /// -/// You can crate a [`BinaryPackage`] using [`crate::bin_factory::ModuleCache`] -/// or [`crate::wapm::parse_static_webc()`]. +/// You can crate a [`BinaryPackage`] using a +/// [`crate::runtime::resolver::PackageResolver`] or +/// [`crate::wapm::parse_static_webc()`]. #[derive(Derivative, Clone)] #[derivative(Debug)] pub struct BinaryPackage { @@ -54,7 +55,7 @@ pub struct BinaryPackage { pub when_cached: Option, #[derivative(Debug = "ignore")] pub entry: Option, - pub hash: Arc>>, + pub hash: OnceCell, pub webc_fs: Option>, pub commands: Arc>>, pub uses: Vec, @@ -64,15 +65,13 @@ pub struct BinaryPackage { } impl BinaryPackage { - pub fn hash(&self) -> String { - let mut hash = self.hash.lock().unwrap(); - if hash.is_none() { + pub fn hash(&self) -> ModuleHash { + *self.hash.get_or_init(|| { if let Some(entry) = self.entry.as_ref() { - hash.replace(hash_of_binary(entry.as_ref())); + ModuleHash::sha256(entry) } else { - hash.replace(hash_of_binary(&self.package_name)); + ModuleHash::sha256(self.package_name.as_bytes()) } - } - hash.as_ref().unwrap().clone() + }) } } diff --git a/lib/wasi/src/bin_factory/exec.rs b/lib/wasi/src/bin_factory/exec.rs index 62a4becdacd..a2986cb196e 100644 --- a/lib/wasi/src/bin_factory/exec.rs +++ b/lib/wasi/src/bin_factory/exec.rs @@ -2,6 +2,7 @@ use std::{pin::Pin, sync::Arc}; use crate::{ os::task::{thread::WasiThreadRunGuard, TaskJoinHandle}, + runtime::module_cache::ModuleCache, VirtualBusError, WasiRuntimeError, }; use futures::Future; @@ -9,24 +10,25 @@ use tracing::*; use wasmer::{FunctionEnvMut, Instance, Memory, Module, Store}; use wasmer_wasix_types::wasi::Errno; -use super::{BinFactory, BinaryPackage, ModuleCache}; +use super::{BinFactory, BinaryPackage}; use crate::{ import_object_for_all_wasi_versions, runtime::SpawnType, SpawnedMemory, WasiEnv, WasiFunctionEnv, WasiRuntime, }; -pub fn spawn_exec( +#[tracing::instrument(level = "trace", skip_all, fields(%name, %binary.package_name))] +pub async fn spawn_exec( binary: BinaryPackage, name: &str, store: Store, env: WasiEnv, runtime: &Arc, - compiled_modules: &ModuleCache, ) -> Result { - // The deterministic id for this engine - let compiler = store.engine().deterministic_id(); + let key = binary.hash(); + + let compiled_modules = runtime.module_cache(); + let module = compiled_modules.load(key, store.engine()).await.ok(); - let module = compiled_modules.get_compiled_module(&store, binary.hash().as_str(), compiler); let module = match (module, binary.entry.as_ref()) { (Some(a), _) => a, (None, Some(entry)) => { @@ -43,7 +45,15 @@ pub fn spawn_exec( env.blocking_cleanup(Some(Errno::Noexec.into())); } let module = module?; - compiled_modules.set_compiled_module(binary.hash().as_str(), compiler, &module); + + if let Err(e) = compiled_modules.save(key, store.engine(), &module).await { + tracing::debug!( + %key, + package_name=%binary.package_name, + error=&e as &dyn std::error::Error, + "Unable to save the compiled module", + ); + } module } (None, None) => { @@ -205,14 +215,7 @@ impl BinFactory { let binary = binary?; // Execute - spawn_exec( - binary, - name.as_str(), - store, - env, - &self.runtime, - &self.cache, - ) + spawn_exec(binary, name.as_str(), store, env, &self.runtime).await }) } diff --git a/lib/wasi/src/bin_factory/mod.rs b/lib/wasi/src/bin_factory/mod.rs index 41870e3884a..54dcce4a4e5 100644 --- a/lib/wasi/src/bin_factory/mod.rs +++ b/lib/wasi/src/bin_factory/mod.rs @@ -10,14 +10,10 @@ use virtual_fs::{AsyncReadExt, FileSystem}; mod binary_package; mod exec; -mod module_cache; - -use sha2::*; pub use self::{ binary_package::*, exec::{spawn_exec, spawn_exec_module}, - module_cache::ModuleCache, }; use crate::{os::command::Commands, WasiRuntime}; @@ -25,19 +21,14 @@ use crate::{os::command::Commands, WasiRuntime}; pub struct BinFactory { pub(crate) commands: Commands, runtime: Arc, - pub(crate) cache: Arc, pub(crate) local: Arc>>>, } impl BinFactory { - pub fn new( - compiled_modules: Arc, - runtime: Arc, - ) -> BinFactory { + pub fn new(runtime: Arc) -> BinFactory { BinFactory { - commands: Commands::new_with_builtins(runtime.clone(), compiled_modules.clone()), + commands: Commands::new_with_builtins(runtime.clone()), runtime, - cache: compiled_modules, local: Arc::new(RwLock::new(HashMap::new())), } } @@ -118,10 +109,3 @@ async fn load_package_from_filesystem( Ok(pkg) } - -pub fn hash_of_binary(data: impl AsRef<[u8]>) -> String { - let mut hasher = Sha256::default(); - hasher.update(data.as_ref()); - let hash = hasher.finalize(); - hex::encode(&hash[..]) -} diff --git a/lib/wasi/src/bin_factory/module_cache.rs b/lib/wasi/src/bin_factory/module_cache.rs deleted file mode 100644 index e14fc29d68c..00000000000 --- a/lib/wasi/src/bin_factory/module_cache.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::{cell::RefCell, collections::HashMap, ops::DerefMut, path::PathBuf, sync::RwLock}; - -use anyhow::Context; -use wasmer::Module; - -use super::BinaryPackage; -use crate::WasiRuntime; - -pub const DEFAULT_COMPILED_PATH: &str = "~/.wasmer/compiled"; - -#[derive(Debug)] -pub struct ModuleCache { - pub(crate) cache_compile_dir: PathBuf, - pub(crate) cached_modules: Option>>, -} - -// FIXME: remove impls! -// Added as a stopgap to get the crate to compile again with the "js" feature. -// wasmer::Module holds a JsValue, which makes it non-sync. -#[cfg(feature = "js")] -unsafe impl Send for ModuleCache {} -#[cfg(feature = "js")] -unsafe impl Sync for ModuleCache {} - -impl Default for ModuleCache { - fn default() -> Self { - ModuleCache::new(None, true) - } -} - -thread_local! { - static THREAD_LOCAL_CACHED_MODULES: std::cell::RefCell> - = RefCell::new(HashMap::new()); -} - -impl ModuleCache { - /// Create a new [`ModuleCache`]. - /// - /// use_shared_cache enables a shared cache of modules in addition to a thread-local cache. - pub fn new(cache_compile_dir: Option, use_shared_cache: bool) -> ModuleCache { - let cache_compile_dir = cache_compile_dir.unwrap_or_else(|| { - PathBuf::from(shellexpand::tilde(DEFAULT_COMPILED_PATH).into_owned()) - }); - let _ = std::fs::create_dir_all(&cache_compile_dir); - - let cached_modules = if use_shared_cache { - Some(RwLock::new(HashMap::default())) - } else { - None - }; - - ModuleCache { - cached_modules, - cache_compile_dir, - } - } - - pub fn get_webc( - &self, - webc: &str, - runtime: &dyn WasiRuntime, - ) -> Result { - let ident = webc.parse().context("Unable to parse the package name")?; - let resolver = runtime.package_resolver(); - let client = runtime - .http_client() - .context("No HTTP client available")? - .clone(); - - runtime.task_manager().block_on(async move { - resolver - .resolve_package(&ident, &*client) - .await - .with_context(|| format!("An error occurred while fetching \"{webc}\"")) - }) - } - - pub fn get_compiled_module( - &self, - engine: &impl wasmer::AsEngineRef, - data_hash: &str, - compiler: &str, - ) -> Option { - let key = format!("{}-{}", data_hash, compiler); - - // fastest path - { - let module = THREAD_LOCAL_CACHED_MODULES.with(|cache| { - let cache = cache.borrow(); - cache.get(&key).cloned() - }); - if let Some(module) = module { - return Some(module); - } - } - - // fast path - if let Some(cache) = &self.cached_modules { - let cache = cache.read().unwrap(); - if let Some(module) = cache.get(&key) { - THREAD_LOCAL_CACHED_MODULES.with(|cache| { - let mut cache = cache.borrow_mut(); - cache.insert(key.clone(), module.clone()); - }); - return Some(module.clone()); - } - } - - // slow path - let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str()); - if let Ok(data) = std::fs::read(path.as_path()) { - tracing::trace!("bin file found: {:?} [len={}]", path.as_path(), data.len()); - let mut decoder = weezl::decode::Decoder::new(weezl::BitOrder::Msb, 8); - if let Ok(data) = decoder.decode(&data[..]) { - let module_bytes = bytes::Bytes::from(data); - - // Load the module - let module = match Module::deserialize_checked(engine, &module_bytes[..]) { - Ok(m) => m, - Err(err) => { - tracing::error!( - "failed to deserialize module with hash '{data_hash}': {err}" - ); - return None; - } - }; - - if let Some(cache) = &self.cached_modules { - let mut cache = cache.write().unwrap(); - cache.insert(key.clone(), module.clone()); - } - - THREAD_LOCAL_CACHED_MODULES.with(|cache| { - let mut cache = cache.borrow_mut(); - cache.insert(key.clone(), module.clone()); - }); - return Some(module); - } - } - - // Not found - tracing::trace!("bin file not found: {:?}", path); - None - } - - pub fn set_compiled_module(&self, data_hash: &str, compiler: &str, module: &Module) { - let key = format!("{}-{}", data_hash, compiler); - - // Add the module to the local thread cache - THREAD_LOCAL_CACHED_MODULES.with(|cache| { - let mut cache = cache.borrow_mut(); - let cache = cache.deref_mut(); - cache.insert(key.clone(), module.clone()); - }); - - // Serialize the compiled module into bytes and insert it into the cache - if let Some(cache) = &self.cached_modules { - let mut cache = cache.write().unwrap(); - cache.insert(key.clone(), module.clone()); - } - - // We should also attempt to store it in the cache directory - let compiled_bytes = module.serialize().unwrap(); - - let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str()); - // TODO: forward error! - let _ = std::fs::create_dir_all(path.parent().unwrap()); - let mut encoder = weezl::encode::Encoder::new(weezl::BitOrder::Msb, 8); - if let Ok(compiled_bytes) = encoder.encode(&compiled_bytes[..]) { - let _ = std::fs::write(path, &compiled_bytes[..]); - } - } -} - -#[cfg(test)] -#[cfg(feature = "sys")] -mod tests { - use std::{sync::Arc, time::Duration}; - - use tracing_subscriber::filter::LevelFilter; - - use crate::{runtime::task_manager::tokio::TokioTaskManager, PluggableRuntime}; - - use super::*; - - #[test] - fn test_module_cache() { - let _ = tracing_subscriber::fmt() - .pretty() - .with_test_writer() - .with_max_level(LevelFilter::INFO) - .try_init(); - - let cache = ModuleCache::new(None, true); - - let rt = PluggableRuntime::new(Arc::new(TokioTaskManager::shared())); - let tasks = rt.task_manager(); - - let mut store = Vec::new(); - for _ in 0..2 { - let webc = cache.get_webc("sharrattj/dash", &rt).unwrap(); - store.push(webc); - tasks - .runtime() - .block_on(tasks.sleep_now(Duration::from_secs(1))); - } - } -} diff --git a/lib/wasi/src/fs/fd.rs b/lib/wasi/src/fs/fd.rs index 94ea01eac8a..1eb147745e8 100644 --- a/lib/wasi/src/fs/fd.rs +++ b/lib/wasi/src/fs/fd.rs @@ -23,7 +23,7 @@ pub struct Fd { pub offset: Arc, /// Flags that determine how the [`Fd`] can be used. /// - /// Used when reopening a [`VirtualFile`] during [`WasiState`] deserialization. + /// Used when reopening a [`VirtualFile`] during deserialization. pub open_flags: u16, pub inode: InodeGuard, pub is_stdio: bool, @@ -40,12 +40,12 @@ impl Fd { /// This [`Fd`] will delete everything before writing. Note that truncate /// permissions require the write permission. /// - /// This permission is currently unused when deserializing [`WasiState`]. + /// This permission is currently unused when deserializing. pub const TRUNCATE: u16 = 8; /// This [`Fd`] may create a file before writing to it. Note that create /// permissions require write permissions. /// - /// This permission is currently unused when deserializing [`WasiState`]. + /// This permission is currently unused when deserializing. pub const CREATE: u16 = 16; } diff --git a/lib/wasi/src/lib.rs b/lib/wasi/src/lib.rs index f14d32b272c..90f194c844f 100644 --- a/lib/wasi/src/lib.rs +++ b/lib/wasi/src/lib.rs @@ -349,9 +349,7 @@ pub fn current_caller_id() -> WasiCallingId { .into() } -/// Create an [`Imports`] with an existing [`WasiEnv`]. `WasiEnv` -/// needs a [`WasiState`], that can be constructed from a -/// [`WasiEnvBuilder`](state::WasiEnvBuilder). +/// Create an [`Imports`] with an existing [`WasiEnv`]. pub fn generate_import_object_from_env( store: &mut impl AsStoreMut, ctx: &FunctionEnv, diff --git a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs index edbb4471a8f..c5533a61c8b 100644 --- a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs +++ b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs @@ -1,4 +1,4 @@ -use std::{any::Any, ops::Deref, sync::Arc}; +use std::{any::Any, sync::Arc}; use crate::{ os::task::{OwnedTaskStatus, TaskJoinHandle}, @@ -8,7 +8,7 @@ use wasmer::{FunctionEnvMut, Store}; use wasmer_wasix_types::wasi::Errno; use crate::{ - bin_factory::{spawn_exec, BinaryPackage, ModuleCache}, + bin_factory::{spawn_exec, BinaryPackage}, syscalls::stderr_write, VirtualTaskManagerExt, WasiEnv, WasiRuntime, }; @@ -36,25 +36,18 @@ use crate::os::command::VirtualCommand; #[derive(Debug, Clone)] pub struct CmdWasmer { runtime: Arc, - cache: Arc, } impl CmdWasmer { const NAME: &str = "wasmer"; - pub fn new( - runtime: Arc, - compiled_modules: Arc, - ) -> Self { - Self { - runtime, - cache: compiled_modules, - } + pub fn new(runtime: Arc) -> Self { + Self { runtime } } } impl CmdWasmer { - fn run<'a>( + async fn run<'a>( &self, parent_ctx: &FunctionEnvMut<'a, WasiEnv>, name: &str, @@ -78,34 +71,31 @@ impl CmdWasmer { state.args = args; env.state = Arc::new(state); - // Get the binary - if let Some(binary) = self.get_package(what.clone()) { + if let Some(binary) = self.get_package(what.clone()).await { // Now run the module - spawn_exec(binary, name, store, env, &self.runtime, &self.cache) + spawn_exec(binary, name, store, env, &self.runtime).await } else { - parent_ctx.data().tasks().block_on(async move { - let _ = stderr_write( - parent_ctx, - format!("package not found - {}\r\n", what).as_bytes(), - ) - .await; - }); + let _ = stderr_write( + parent_ctx, + format!("package not found - {}\r\n", what).as_bytes(), + ) + .await; let handle = OwnedTaskStatus::new_finished_with_code(Errno::Noent.into()).handle(); Ok(handle) } + // Get the binary } else { - parent_ctx.data().tasks().block_on(async move { - let _ = stderr_write(parent_ctx, HELP_RUN.as_bytes()).await; - }); + let _ = stderr_write(parent_ctx, HELP_RUN.as_bytes()).await; let handle = OwnedTaskStatus::new_finished_with_code(Errno::Success.into()).handle(); Ok(handle) } } - pub fn get_package(&self, name: String) -> Option { - self.cache - .get_webc(name.as_str(), self.runtime.deref()) - .ok() + pub async fn get_package(&self, name: String) -> Option { + let resolver = self.runtime.package_resolver(); + let client = self.runtime.http_client()?; + let pkg = name.parse().ok()?; + resolver.resolve_package(&pkg, &client).await.ok() } } @@ -127,30 +117,35 @@ impl VirtualCommand for CmdWasmer { ) -> Result { // Read the command we want to run let env_inner = env.as_ref().ok_or(VirtualBusError::UnknownError)?; - let mut args = env_inner.state.args.iter().map(|a| a.as_str()); + let args = env_inner.state.args.clone(); + let mut args = args.iter().map(|s| s.as_str()); let _alias = args.next(); let cmd = args.next(); // Check the command - match cmd { - Some("run") => { - let what = args.next().map(|a| a.to_string()); - let args = args.map(|a| a.to_string()).collect(); - self.run(parent_ctx, name, store, env, what, args) - } - Some("--help") | None => { - parent_ctx.data().tasks().block_on(async move { - let _ = stderr_write(parent_ctx, HELP.as_bytes()).await; - }); - let handle = - OwnedTaskStatus::new_finished_with_code(Errno::Success.into()).handle(); - Ok(handle) - } - Some(what) => { - let what = Some(what.to_string()); - let args = args.map(|a| a.to_string()).collect(); - self.run(parent_ctx, name, store, env, what, args) + let fut = async { + match cmd { + Some("run") => { + let what = args.next().map(|a| a.to_string()); + let args = args.map(|a| a.to_string()).collect(); + self.run(parent_ctx, name, store, env, what, args).await + } + Some("--help") | None => { + parent_ctx.data().tasks().block_on(async move { + let _ = stderr_write(parent_ctx, HELP.as_bytes()).await; + }); + let handle = + OwnedTaskStatus::new_finished_with_code(Errno::Success.into()).handle(); + Ok(handle) + } + Some(what) => { + let what = Some(what.to_string()); + let args = args.map(|a| a.to_string()).collect(); + self.run(parent_ctx, name, store, env, what, args).await + } } - } + }; + + parent_ctx.data().tasks().block_on(fut) } } diff --git a/lib/wasi/src/os/command/mod.rs b/lib/wasi/src/os/command/mod.rs index b2723dbb1eb..d8c65a5833a 100644 --- a/lib/wasi/src/os/command/mod.rs +++ b/lib/wasi/src/os/command/mod.rs @@ -5,9 +5,7 @@ use std::{collections::HashMap, sync::Arc}; use wasmer::{FunctionEnvMut, Store}; use wasmer_wasix_types::wasi::Errno; -use crate::{ - bin_factory::ModuleCache, syscalls::stderr_write, VirtualBusError, WasiEnv, WasiRuntime, -}; +use crate::{syscalls::stderr_write, VirtualBusError, WasiEnv, WasiRuntime}; use super::task::{OwnedTaskStatus, TaskJoinHandle, TaskStatus}; @@ -45,12 +43,9 @@ impl Commands { } // TODO: this method should be somewhere on the runtime, not here. - pub fn new_with_builtins( - runtime: Arc, - compiled_modules: Arc, - ) -> Self { + pub fn new_with_builtins(runtime: Arc) -> Self { let mut cmd = Self::new(); - let cmd_wasmer = builtins::cmd_wasmer::CmdWasmer::new(runtime.clone(), compiled_modules); + let cmd_wasmer = builtins::cmd_wasmer::CmdWasmer::new(runtime.clone()); cmd.register_command(cmd_wasmer); cmd diff --git a/lib/wasi/src/os/console/mod.rs b/lib/wasi/src/os/console/mod.rs index 201d3c9e6e5..1a0d5e80db1 100644 --- a/lib/wasi/src/os/console/mod.rs +++ b/lib/wasi/src/os/console/mod.rs @@ -26,9 +26,10 @@ use wasmer_wasix_types::{types::__WASI_STDIN_FILENO, wasi::BusErrno}; use super::{cconst::ConsoleConst, common::*, task::TaskJoinHandle}; use crate::{ - bin_factory::{spawn_exec, BinFactory, ModuleCache}, + bin_factory::{spawn_exec, BinFactory}, capabilities::Capabilities, os::task::{control_plane::WasiControlPlane, process::WasiProcess}, + runtime::resolver::WebcIdentifier, VirtualBusError, VirtualTaskManagerExt, WasiEnv, WasiRuntime, }; @@ -46,7 +47,6 @@ pub struct Console { prompt: String, env: HashMap, runtime: Arc, - compiled_modules: Arc, stdin: ArcBoxFile, stdout: ArcBoxFile, stderr: ArcBoxFile, @@ -57,7 +57,6 @@ impl Console { pub fn new( webc_boot_package: &str, runtime: Arc, - compiled_modules: Arc, ) -> Self { let prog = webc_boot_package .split_once(' ') @@ -78,7 +77,6 @@ impl Console { env: HashMap::new(), runtime, prompt: "wasmer.sh".to_string(), - compiled_modules, stdin: ArcBoxFile::new(Box::new(Pipe::channel().0)), stdout: ArcBoxFile::new(Box::new(Pipe::channel().0)), stderr: ArcBoxFile::new(Box::new(Pipe::channel().0)), @@ -186,7 +184,6 @@ impl Console { .unwrap() .stdout(Box::new(self.stdout.clone())) .stderr(Box::new(self.stderr.clone())) - .compiled_modules(self.compiled_modules.clone()) .runtime(self.runtime.clone()) .capabilities(self.capabilities.clone()) .build_init() @@ -203,8 +200,25 @@ impl Console { tasks.block_on(self.draw_welcome()); } - let binary = if let Ok(binary) = self.compiled_modules.get_webc(webc, self.runtime.deref()) - { + let webc_ident: WebcIdentifier = match webc.parse() { + Ok(ident) => ident, + Err(e) => { + tracing::debug!(webc, error = &*e, "Unable to parse the WEBC identifier"); + return Err(VirtualBusError::BadRequest); + } + }; + let client = self + .runtime + .http_client() + .ok_or(VirtualBusError::UnknownError)?; + + let resolved_package = tasks.block_on( + self.runtime + .package_resolver() + .resolve_package(&webc_ident, &client), + ); + + let binary = if let Ok(binary) = resolved_package { binary } else { let mut stderr = self.stderr.clone(); @@ -242,14 +256,7 @@ impl Console { // Build the config // Run the binary - let process = spawn_exec( - binary, - prog, - store, - env, - &self.runtime, - self.compiled_modules.as_ref(), - )?; + let process = tasks.block_on(spawn_exec(binary, prog, store, env, &self.runtime))?; // Return the process Ok((process, wasi_process)) diff --git a/lib/wasi/src/os/task/thread.rs b/lib/wasi/src/os/task/thread.rs index 4a0d265d659..99a106650c6 100644 --- a/lib/wasi/src/os/task/thread.rs +++ b/lib/wasi/src/os/task/thread.rs @@ -99,9 +99,9 @@ pub struct WasiThread { /// A guard that ensures a thread is marked as terminated when dropped. /// /// Normally the thread result should be manually registered with -/// [`Thread::set_status_running`] or [`Thread::set_status_finished`], but -/// this guard can ensure that the thread is marked as terminated even if this -/// is forgotten or a panic occurs. +/// [`WasiThread::set_status_running`] or [`WasiThread::set_status_finished`], +/// but this guard can ensure that the thread is marked as terminated even if +/// this is forgotten or a panic occurs. pub struct WasiThreadRunGuard { pub thread: WasiThread, } diff --git a/lib/wasi/src/runtime/mod.rs b/lib/wasi/src/runtime/mod.rs index 25c2b054f6a..527258f885c 100644 --- a/lib/wasi/src/runtime/mod.rs +++ b/lib/wasi/src/runtime/mod.rs @@ -1,3 +1,4 @@ +pub mod module_cache; pub mod resolver; pub mod task_manager; @@ -14,7 +15,10 @@ use virtual_net::{DynVirtualNetworking, VirtualNetworking}; use crate::{ http::DynHttpClient, os::TtyBridge, - runtime::resolver::{PackageResolver, RegistryResolver}, + runtime::{ + module_cache::ModuleCache, + resolver::{PackageResolver, RegistryResolver}, + }, WasiTtyState, }; @@ -34,6 +38,9 @@ where fn package_resolver(&self) -> Arc; + /// A cache for compiled modules. + fn module_cache(&self) -> Arc; + /// Get a [`wasmer::Engine`] for module compilation. fn engine(&self) -> Option { None @@ -97,6 +104,7 @@ pub struct PluggableRuntime { pub http_client: Option, pub resolver: Arc, pub engine: Option, + pub module_cache: Arc, #[derivative(Debug = "ignore")] pub tty: Option>, } @@ -124,6 +132,7 @@ impl PluggableRuntime { engine: None, tty: None, resolver: Arc::new(resolver), + module_cache: Arc::new(module_cache::in_memory()), } } @@ -145,6 +154,14 @@ impl PluggableRuntime { self } + pub fn set_module_cache(&mut self, module_cache: M) -> &mut Self + where + M: ModuleCache + Send + Sync + 'static, + { + self.module_cache = Arc::new(module_cache); + self + } + pub fn set_resolver( &mut self, resolver: impl PackageResolver + Send + Sync + 'static, @@ -181,4 +198,8 @@ impl WasiRuntime for PluggableRuntime { fn tty(&self) -> Option<&(dyn TtyBridge + Send + Sync)> { self.tty.as_deref() } + + fn module_cache(&self) -> Arc { + self.module_cache.clone() + } } diff --git a/lib/wasi/src/runtime/module_cache/and_then.rs b/lib/wasi/src/runtime/module_cache/and_then.rs new file mode 100644 index 00000000000..e6a9669192e --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/and_then.rs @@ -0,0 +1,226 @@ +use wasmer::{Engine, Module}; + +use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash}; + +/// A [`ModuleCache`] combinator which will try operations on one cache +/// and fall back to a secondary cache if they fail. +/// +/// Constructed via [`ModuleCache::and_then()`]. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AndThen { + primary: Primary, + secondary: Secondary, +} + +impl AndThen { + pub(crate) fn new(primary: Primary, secondary: Secondary) -> Self { + AndThen { primary, secondary } + } + + pub fn primary(&self) -> &Primary { + &self.primary + } + + pub fn primary_mut(&mut self) -> &mut Primary { + &mut self.primary + } + + pub fn secondary(&self) -> &Secondary { + &self.secondary + } + + pub fn secondary_mut(&mut self) -> &mut Secondary { + &mut self.secondary + } + + pub fn into_inner(self) -> (Primary, Secondary) { + let AndThen { primary, secondary } = self; + (primary, secondary) + } +} + +#[async_trait::async_trait] +impl ModuleCache for AndThen +where + Primary: ModuleCache + Send + Sync, + Secondary: ModuleCache + Send + Sync, +{ + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + let primary_error = match self.primary.load(key, engine).await { + Ok(m) => return Ok(m), + Err(e) => e, + }; + + if let Ok(m) = self.secondary.load(key, engine).await { + // Now we've got a module, let's make sure it ends up in the primary + // cache too. + if let Err(e) = self.primary.save(key, engine, &m).await { + tracing::warn!( + %key, + error = &e as &dyn std::error::Error, + "Unable to save a module to the primary cache", + ); + } + + return Ok(m); + } + + Err(primary_error) + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + futures::try_join!( + self.primary.save(key, engine, module), + self.secondary.save(key, engine, module) + )?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use super::*; + use crate::runtime::module_cache::SharedCache; + + const ADD_WAT: &[u8] = br#"( + module + (func + (export "add") + (param $x i64) + (param $y i64) + (result i64) + (i64.add (local.get $x) (local.get $y))) + )"#; + + #[derive(Debug)] + struct Spy { + inner: I, + success: AtomicUsize, + failures: AtomicUsize, + } + + impl Spy { + fn new(inner: I) -> Self { + Spy { + inner, + success: AtomicUsize::new(0), + failures: AtomicUsize::new(0), + } + } + + fn success(&self) -> usize { + self.success.load(Ordering::SeqCst) + } + + fn failures(&self) -> usize { + self.failures.load(Ordering::SeqCst) + } + } + + #[async_trait::async_trait] + impl ModuleCache for Spy { + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + match self.inner.load(key, engine).await { + Ok(m) => { + self.success.fetch_add(1, Ordering::SeqCst); + Ok(m) + } + Err(e) => { + self.failures.fetch_add(1, Ordering::SeqCst); + Err(e) + } + } + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + match self.inner.save(key, engine, module).await { + Ok(_) => { + self.success.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + Err(e) => { + self.failures.fetch_add(1, Ordering::SeqCst); + Err(e) + } + } + } + } + + #[tokio::test] + async fn load_from_primary() { + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let key = ModuleHash::from_raw([0; 32]); + let primary = SharedCache::default(); + let secondary = SharedCache::default(); + primary.save(key, &engine, &module).await.unwrap(); + let primary = Spy::new(primary); + let secondary = Spy::new(secondary); + let cache = AndThen::new(&primary, &secondary); + + let got = cache.load(key, &engine).await.unwrap(); + + // We should have received the same module + assert_eq!(module, got); + assert_eq!(primary.success(), 1); + assert_eq!(primary.failures(), 0); + // but the secondary wasn't touched at all + assert_eq!(secondary.success(), 0); + assert_eq!(secondary.failures(), 0); + // And the secondary still doesn't have our module + assert!(secondary.load(key, &engine).await.is_err()); + } + + #[tokio::test] + async fn loading_from_secondary_also_populates_primary() { + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let key = ModuleHash::from_raw([0; 32]); + let primary = SharedCache::default(); + let secondary = SharedCache::default(); + secondary.save(key, &engine, &module).await.unwrap(); + let primary = Spy::new(primary); + let secondary = Spy::new(secondary); + let cache = AndThen::new(&primary, &secondary); + + let got = cache.load(key, &engine).await.unwrap(); + + // We should have received the same module + assert_eq!(module, got); + // We got a hit on the secondary + assert_eq!(secondary.success(), 1); + assert_eq!(secondary.failures(), 0); + // the load() on our primary failed + assert_eq!(primary.failures(), 1); + // but afterwards, we updated the primary cache with our module + assert_eq!(primary.success(), 1); + assert_eq!(primary.load(key, &engine).await.unwrap(), module); + } + + #[tokio::test] + async fn saving_will_update_both() { + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let key = ModuleHash::from_raw([0; 32]); + let primary = SharedCache::default(); + let secondary = SharedCache::default(); + let cache = AndThen::new(&primary, &secondary); + + cache.save(key, &engine, &module).await.unwrap(); + + assert_eq!(primary.load(key, &engine).await.unwrap(), module); + assert_eq!(secondary.load(key, &engine).await.unwrap(), module); + } +} diff --git a/lib/wasi/src/runtime/module_cache/filesystem.rs b/lib/wasi/src/runtime/module_cache/filesystem.rs new file mode 100644 index 00000000000..81ae8f823f0 --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/filesystem.rs @@ -0,0 +1,225 @@ +use std::path::{Path, PathBuf}; + +use tempfile::NamedTempFile; +use wasmer::{Engine, Module}; + +use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash}; + +/// A cache that saves modules to a folder on the host filesystem using +/// [`Module::serialize()`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileSystemCache { + cache_dir: PathBuf, +} + +impl FileSystemCache { + pub fn new(cache_dir: impl Into) -> Self { + FileSystemCache { + cache_dir: cache_dir.into(), + } + } + + pub fn cache_dir(&self) -> &Path { + &self.cache_dir + } + + fn path(&self, key: ModuleHash, deterministic_id: &str) -> PathBuf { + let artifact_version = wasmer_types::MetadataHeader::CURRENT_VERSION; + self.cache_dir + .join(format!("{deterministic_id}-v{artifact_version}")) + .join(key.to_string()) + .with_extension("bin") + } +} + +#[async_trait::async_trait] +impl ModuleCache for FileSystemCache { + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + let path = self.path(key, engine.deterministic_id()); + + // FIXME: This will all block the thread at the moment. Ideally, + // deserializing and uncompressing would happen on a thread pool in the + // background. + // https://github.com/wasmerio/wasmer/issues/3851 + + let uncompressed = read_compressed(&path)?; + + match Module::deserialize_checked(&engine, &uncompressed) { + Ok(m) => Ok(m), + Err(e) => { + tracing::debug!( + %key, + path=%path.display(), + error=&e as &dyn std::error::Error, + "Deleting the cache file because the artifact couldn't be deserialized", + ); + + if let Err(e) = std::fs::remove_file(&path) { + tracing::warn!( + %key, + path=%path.display(), + error=&e as &dyn std::error::Error, + "Unable to remove the corrupted cache file", + ); + } + + Err(CacheError::Deserialize(e)) + } + } + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + let path = self.path(key, engine.deterministic_id()); + + // FIXME: This will all block the thread at the moment. Ideally, + // serializing and compressing would happen on a thread pool in the + // background. + // https://github.com/wasmerio/wasmer/issues/3851 + + let parent = path + .parent() + .expect("Unreachable - always created by joining onto cache_dir"); + + if let Err(e) = std::fs::create_dir_all(parent) { + tracing::warn!( + dir=%parent.display(), + error=&e as &dyn std::error::Error, + "Unable to create the cache directory", + ); + } + + // Note: We save to a temporary file and persist() it at the end so + // concurrent readers won't see a partially written module. + let mut f = NamedTempFile::new_in(parent).map_err(CacheError::other)?; + let serialized = module.serialize()?; + + if let Err(e) = save_compressed(&mut f, &serialized) { + return Err(CacheError::FileWrite { path, error: e }); + } + + f.persist(&path).map_err(CacheError::other)?; + + Ok(()) + } +} + +fn save_compressed(writer: impl std::io::Write, data: &[u8]) -> Result<(), std::io::Error> { + let mut encoder = weezl::encode::Encoder::new(weezl::BitOrder::Msb, 8); + encoder + .into_stream(writer) + .encode_all(std::io::Cursor::new(data)) + .status?; + + Ok(()) +} + +fn read_compressed(path: &Path) -> Result, CacheError> { + let compressed = match std::fs::read(path) { + Ok(bytes) => bytes, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Err(CacheError::NotFound); + } + Err(error) => { + return Err(CacheError::FileRead { + path: path.to_path_buf(), + error, + }); + } + }; + + let mut uncompressed = Vec::new(); + let mut decoder = weezl::decode::Decoder::new(weezl::BitOrder::Msb, 8); + decoder + .into_vec(&mut uncompressed) + .decode_all(&compressed) + .status + .map_err(CacheError::other)?; + + Ok(uncompressed) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + + use tempfile::TempDir; + + use super::*; + + const ADD_WAT: &[u8] = br#"( + module + (func + (export "add") + (param $x i64) + (param $y i64) + (result i64) + (i64.add (local.get $x) (local.get $y))) + )"#; + + #[tokio::test] + async fn save_to_disk() { + let temp = TempDir::new().unwrap(); + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let cache = FileSystemCache::new(temp.path()); + let key = ModuleHash::from_raw([0; 32]); + let expected_path = cache.path(key, engine.deterministic_id()); + + cache.save(key, &engine, &module).await.unwrap(); + + assert!(expected_path.exists()); + } + + #[tokio::test] + async fn create_cache_dir_automatically() { + let temp = TempDir::new().unwrap(); + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let cache_dir = temp.path().join("this").join("doesn't").join("exist"); + assert!(!cache_dir.exists()); + let cache = FileSystemCache::new(&cache_dir); + let key = ModuleHash::from_raw([0; 32]); + + cache.save(key, &engine, &module).await.unwrap(); + + assert!(cache_dir.is_dir()); + } + + #[tokio::test] + async fn missing_file() { + let temp = TempDir::new().unwrap(); + let engine = Engine::default(); + let key = ModuleHash::from_raw([0; 32]); + let cache = FileSystemCache::new(temp.path()); + + let err = cache.load(key, &engine).await.unwrap_err(); + + assert!(matches!(err, CacheError::NotFound)); + } + + #[tokio::test] + async fn load_from_disk() { + let temp = TempDir::new().unwrap(); + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let key = ModuleHash::from_raw([0; 32]); + let cache = FileSystemCache::new(temp.path()); + let expected_path = cache.path(key, engine.deterministic_id()); + std::fs::create_dir_all(expected_path.parent().unwrap()).unwrap(); + let serialized = module.serialize().unwrap(); + save_compressed(File::create(&expected_path).unwrap(), &serialized).unwrap(); + + let module = cache.load(key, &engine).await.unwrap(); + + let exports: Vec<_> = module + .exports() + .map(|export| export.name().to_string()) + .collect(); + assert_eq!(exports, ["add"]); + } +} diff --git a/lib/wasi/src/runtime/module_cache/mod.rs b/lib/wasi/src/runtime/module_cache/mod.rs new file mode 100644 index 00000000000..928dd05dc8e --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/mod.rs @@ -0,0 +1,35 @@ +mod and_then; +mod filesystem; +mod shared; +mod thread_local; +mod types; + +pub use self::{ + and_then::AndThen, + filesystem::FileSystemCache, + shared::SharedCache, + thread_local::ThreadLocalCache, + types::{CacheError, ModuleCache, ModuleHash}, +}; + +/// Get a [`ModuleCache`] which should be good enough for most in-memory use +/// cases. +/// +/// # Platform-specific Notes +/// +/// This will use the [`ThreadLocalCache`] when running in the browser. Each +/// thread lives in a separate worker, so sharing compiled modules in the +/// browser requires using a custom [`ModuleCache`] built on top of +/// [`postMessage()`][pm] and [`SharedArrayBuffer`][sab]. +/// +/// [pm]: https://developer.mozilla.org/en-US/docs/Web/API/Worker/postMessage +/// [sab]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer +pub fn in_memory() -> impl ModuleCache + Send + Sync { + cfg_if::cfg_if! { + if #[cfg(feature = "js")] { + ThreadLocalCache::default() + } else { + SharedCache::default() + } + } +} diff --git a/lib/wasi/src/runtime/module_cache/shared.rs b/lib/wasi/src/runtime/module_cache/shared.rs new file mode 100644 index 00000000000..05476e26a55 --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/shared.rs @@ -0,0 +1,71 @@ +use dashmap::DashMap; +use wasmer::{Engine, Module}; + +use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash}; + +/// A [`ModuleCache`] based on a [DashMap]<[ModuleHash], [Module]>. +#[derive(Debug, Default, Clone)] +pub struct SharedCache { + modules: DashMap<(ModuleHash, String), Module>, +} + +impl SharedCache { + pub fn new() -> SharedCache { + SharedCache::default() + } +} + +#[async_trait::async_trait] +impl ModuleCache for SharedCache { + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + let key = (key, engine.deterministic_id().to_string()); + self.modules + .get(&key) + .map(|m| m.value().clone()) + .ok_or(CacheError::NotFound) + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + let key = (key, engine.deterministic_id().to_string()); + self.modules.insert(key, module.clone()); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const ADD_WAT: &[u8] = br#"( + module + (func + (export "add") + (param $x i64) + (param $y i64) + (result i64) + (i64.add (local.get $x) (local.get $y))) + )"#; + + #[tokio::test] + async fn round_trip_via_cache() { + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let cache = SharedCache::default(); + let key = ModuleHash::from_raw([0; 32]); + + cache.save(key, &engine, &module).await.unwrap(); + let round_tripped = cache.load(key, &engine).await.unwrap(); + + let exports: Vec<_> = round_tripped + .exports() + .map(|export| export.name().to_string()) + .collect(); + assert_eq!(exports, ["add"]); + } +} diff --git a/lib/wasi/src/runtime/module_cache/thread_local.rs b/lib/wasi/src/runtime/module_cache/thread_local.rs new file mode 100644 index 00000000000..f7212c11f8f --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/thread_local.rs @@ -0,0 +1,77 @@ +use std::{cell::RefCell, collections::HashMap}; + +use wasmer::{Engine, Module}; + +use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash}; + +std::thread_local! { + static CACHED_MODULES: RefCell> + = RefCell::new(HashMap::new()); +} + +/// A cache that will cache modules in a thread-local variable. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct ThreadLocalCache {} + +impl ThreadLocalCache { + fn lookup(&self, key: ModuleHash, deterministic_id: &str) -> Option { + let key = (key, deterministic_id.to_string()); + CACHED_MODULES.with(|m| m.borrow().get(&key).cloned()) + } + + fn insert(&self, key: ModuleHash, module: &Module, deterministic_id: &str) { + let key = (key, deterministic_id.to_string()); + CACHED_MODULES.with(|m| m.borrow_mut().insert(key, module.clone())); + } +} + +#[async_trait::async_trait] +impl ModuleCache for ThreadLocalCache { + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + self.lookup(key, engine.deterministic_id()) + .ok_or(CacheError::NotFound) + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + self.insert(key, module, engine.deterministic_id()); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const ADD_WAT: &[u8] = br#"( + module + (func + (export "add") + (param $x i64) + (param $y i64) + (result i64) + (i64.add (local.get $x) (local.get $y))) + )"#; + + #[tokio::test(flavor = "current_thread")] + async fn round_trip_via_cache() { + let engine = Engine::default(); + let module = Module::new(&engine, ADD_WAT).unwrap(); + let cache = ThreadLocalCache::default(); + let key = ModuleHash::from_raw([0; 32]); + + cache.save(key, &engine, &module).await.unwrap(); + let round_tripped = cache.load(key, &engine).await.unwrap(); + + let exports: Vec<_> = round_tripped + .exports() + .map(|export| export.name().to_string()) + .collect(); + assert_eq!(exports, ["add"]); + } +} diff --git a/lib/wasi/src/runtime/module_cache/types.rs b/lib/wasi/src/runtime/module_cache/types.rs new file mode 100644 index 00000000000..8ed0355ec4a --- /dev/null +++ b/lib/wasi/src/runtime/module_cache/types.rs @@ -0,0 +1,196 @@ +use std::{ + fmt::{self, Debug, Display, Formatter}, + ops::Deref, + path::PathBuf, +}; + +use sha2::{Digest, Sha256}; +use wasmer::{Engine, Module}; + +use crate::runtime::module_cache::AndThen; + +/// A cache for compiled WebAssembly modules. +/// +/// ## Deterministic ID +/// +/// Implementations are encouraged to take the [`Engine::deterministic_id()`] +/// into account when saving and loading cached a [`Module`]. +/// +/// ## Assumptions +/// +/// Implementations can assume that cache keys are unique and that using the +/// same key to load or save will always result in the "same" module. +/// +/// Implementations can also assume that [`ModuleCache::load()`] will +/// be called more often than [`ModuleCache::save()`] and optimise +/// their caching strategy accordingly. +/// +#[async_trait::async_trait] +pub trait ModuleCache: Debug { + /// Load a module based on its hash. + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result; + + /// Save a module so it can be retrieved with [`ModuleCache::load()`] at a + /// later time. + /// + /// # Panics + /// + /// Implementations are free to assume the [`Module`] being passed in was + /// compiled using the provided [`Engine`], and may panic if this isn't the + /// case. + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError>; + + /// Chain a second cache onto this one. + /// + /// The general assumption is that each subsequent cache in the chain will + /// be significantly slower than the previous one. + /// + /// ```rust + /// use wasmer_wasix::runtime::module_cache::{ + /// ModuleCache, ThreadLocalCache, FileSystemCache, SharedCache, + /// }; + /// + /// let cache = SharedCache::default() + /// .and_then(FileSystemCache::new("~/.local/cache")); + /// ``` + fn and_then(self, other: C) -> AndThen + where + Self: Sized, + C: ModuleCache, + { + AndThen::new(self, other) + } +} + +#[async_trait::async_trait] +impl ModuleCache for D +where + D: Deref + Debug + Send + Sync, + C: ModuleCache + Send + Sync + ?Sized, +{ + async fn load(&self, key: ModuleHash, engine: &Engine) -> Result { + (**self).load(key, engine).await + } + + async fn save( + &self, + key: ModuleHash, + engine: &Engine, + module: &Module, + ) -> Result<(), CacheError> { + (**self).save(key, engine, module).await + } +} + +/// Possible errors that may occur during [`ModuleCache`] operations. +#[derive(Debug, thiserror::Error)] +pub enum CacheError { + #[error("Unable to serialize the module")] + Serialize(#[from] wasmer::SerializeError), + #[error("Unable to deserialize the module")] + Deserialize(#[from] wasmer::DeserializeError), + #[error("Unable to read from \"{}\"", path.display())] + FileRead { + path: PathBuf, + #[source] + error: std::io::Error, + }, + #[error("Unable to write to \"{}\"", path.display())] + FileWrite { + path: PathBuf, + #[source] + error: std::io::Error, + }, + /// The item was not found. + #[error("Not found")] + NotFound, + /// A catch-all variant for any other errors that may occur. + #[error(transparent)] + Other(Box), +} + +impl CacheError { + pub fn other(error: impl std::error::Error + Send + Sync + 'static) -> Self { + CacheError::Other(Box::new(error)) + } +} + +/// The SHA-256 hash of a WebAssembly module. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct ModuleHash([u8; 32]); + +impl ModuleHash { + /// Create a new [`ModuleHash`] from the raw SHA-256 hash. + pub fn from_raw(key: [u8; 32]) -> Self { + ModuleHash(key) + } + + /// Generate a new [`ModuleCache`] based on the SHA-256 hash of some bytes. + pub fn sha256(wasm: impl AsRef<[u8]>) -> Self { + let wasm = wasm.as_ref(); + + let mut hasher = Sha256::default(); + hasher.update(wasm); + ModuleHash::from_raw(hasher.finalize().into()) + } + + /// Get the raw SHA-256 hash. + pub fn as_raw(self) -> [u8; 32] { + self.0 + } +} + +impl Display for ModuleHash { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + for byte in self.0 { + write!(f, "{byte:02X}")?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_object_safe() { + let _: Option> = None; + } + + #[test] + fn key_is_displayed_as_hex() { + let key = ModuleHash::from_raw([ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, + 0x0e, 0x0f, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, + 0x1c, 0x1d, 0x1e, 0x1f, + ]); + + let repr = key.to_string(); + + assert_eq!( + repr, + "000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F" + ); + } + + #[test] + fn module_hash_is_just_sha_256() { + let wasm = b"\0asm..."; + let raw = [ + 0x5a, 0x39, 0xfe, 0xef, 0x52, 0xe5, 0x3b, 0x8f, 0xfe, 0xdf, 0xd7, 0x05, 0x15, 0x56, + 0xec, 0x10, 0x5e, 0xd8, 0x69, 0x82, 0xf1, 0x22, 0xa0, 0x5d, 0x27, 0x28, 0xd9, 0x67, + 0x78, 0xe4, 0xeb, 0x96, + ]; + + let hash = ModuleHash::sha256(wasm); + + assert_eq!(hash.as_raw(), raw); + } +} diff --git a/lib/wasi/src/runtime/resolver/cache.rs b/lib/wasi/src/runtime/resolver/cache.rs index 1517cda3982..f0b41777a32 100644 --- a/lib/wasi/src/runtime/resolver/cache.rs +++ b/lib/wasi/src/runtime/resolver/cache.rs @@ -59,7 +59,7 @@ impl InMemoryCache { #[async_trait::async_trait] impl PackageResolver for InMemoryCache where - R: PackageResolver, + R: PackageResolver + Send + Sync, { async fn resolve_package( &self, @@ -92,6 +92,8 @@ where mod tests { use std::sync::{Arc, Mutex}; + use once_cell::sync::OnceCell; + use super::*; #[derive(Debug, Default)] @@ -117,7 +119,7 @@ mod tests { version: version.parse().unwrap(), when_cached: None, entry: None, - hash: Arc::default(), + hash: OnceCell::new(), webc_fs: None, commands: Arc::default(), uses: Vec::new(), diff --git a/lib/wasi/src/runtime/resolver/types.rs b/lib/wasi/src/runtime/resolver/types.rs index 3058494dcdd..19ef9380d52 100644 --- a/lib/wasi/src/runtime/resolver/types.rs +++ b/lib/wasi/src/runtime/resolver/types.rs @@ -1,4 +1,10 @@ -use std::{collections::BTreeMap, fmt::Display, ops::Deref, path::PathBuf, str::FromStr}; +use std::{ + collections::BTreeMap, + fmt::{Debug, Display}, + ops::Deref, + path::PathBuf, + str::FromStr, +}; use anyhow::Context; use semver::VersionReq; @@ -6,7 +12,7 @@ use semver::VersionReq; use crate::{bin_factory::BinaryPackage, http::HttpClient, runtime::resolver::InMemoryCache}; #[async_trait::async_trait] -pub trait PackageResolver: std::fmt::Debug + Send + Sync { +pub trait PackageResolver: Debug { /// Resolve a package, loading all dependencies. async fn resolve_package( &self, @@ -26,8 +32,8 @@ pub trait PackageResolver: std::fmt::Debug + Send + Sync { #[async_trait::async_trait] impl PackageResolver for D where - D: Deref + std::fmt::Debug + Send + Sync, - R: PackageResolver + ?Sized, + D: Deref + Debug + Send + Sync, + R: PackageResolver + Send + Sync + ?Sized, { /// Resolve a package, loading all dependencies. async fn resolve_package( diff --git a/lib/wasi/src/runtime/task_manager/mod.rs b/lib/wasi/src/runtime/task_manager/mod.rs index 7116d27d55e..1e3f7bbd34d 100644 --- a/lib/wasi/src/runtime/task_manager/mod.rs +++ b/lib/wasi/src/runtime/task_manager/mod.rs @@ -2,7 +2,7 @@ #[cfg(feature = "sys-thread")] pub mod tokio; -use std::{pin::Pin, time::Duration}; +use std::{ops::Deref, pin::Pin, time::Duration}; use ::tokio::runtime::Handle; use futures::Future; @@ -79,6 +79,64 @@ pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static { fn thread_parallelism(&self) -> Result; } +#[async_trait::async_trait] +impl VirtualTaskManager for D +where + D: Deref + std::fmt::Debug + Send + Sync + 'static, + T: VirtualTaskManager + ?Sized, +{ + fn build_memory( + &self, + store: &mut StoreMut, + spawn_type: SpawnType, + ) -> Result, WasiThreadError> { + (**self).build_memory(store, spawn_type) + } + + async fn sleep_now(&self, time: Duration) { + (**self).sleep_now(time).await + } + + fn task_shared( + &self, + task: Box< + dyn FnOnce() -> Pin + Send + 'static>> + Send + 'static, + >, + ) -> Result<(), WasiThreadError> { + (**self).task_shared(task) + } + + fn runtime(&self) -> &Handle { + (**self).runtime() + } + + #[allow(dyn_drop)] + fn runtime_enter<'g>(&'g self) -> Box { + (**self).runtime_enter() + } + + fn task_wasm( + &self, + task: Box) + Send + 'static>, + store: Store, + module: Module, + spawn_type: SpawnType, + ) -> Result<(), WasiThreadError> { + (**self).task_wasm(task, store, module, spawn_type) + } + + fn task_dedicated( + &self, + task: Box, + ) -> Result<(), WasiThreadError> { + (**self).task_dedicated(task) + } + + fn thread_parallelism(&self) -> Result { + (**self).thread_parallelism() + } +} + impl dyn VirtualTaskManager { /// Execute a future and return the output. /// This method blocks until the future is complete. @@ -93,14 +151,12 @@ pub trait VirtualTaskManagerExt { fn block_on<'a, A>(&self, task: impl Future + 'a) -> A; } -impl<'a, T: VirtualTaskManager> VirtualTaskManagerExt for &'a T { - fn block_on<'x, A>(&self, task: impl Future + 'x) -> A { - self.runtime().block_on(task) - } -} - -impl VirtualTaskManagerExt for std::sync::Arc { - fn block_on<'x, A>(&self, task: impl Future + 'x) -> A { +impl VirtualTaskManagerExt for D +where + D: Deref, + T: VirtualTaskManager + ?Sized, +{ + fn block_on<'a, A>(&self, task: impl Future + 'a) -> A { self.runtime().block_on(task) } } diff --git a/lib/wasi/src/runtime/task_manager/tokio.rs b/lib/wasi/src/runtime/task_manager/tokio.rs index b6ab24d7d7b..dade0380f29 100644 --- a/lib/wasi/src/runtime/task_manager/tokio.rs +++ b/lib/wasi/src/runtime/task_manager/tokio.rs @@ -43,7 +43,7 @@ impl TokioTaskManager { Ok(()) } - /// Shared tokio [`Runtime`] that is used by default. + /// Shared tokio [`VirtualTaskManager`] that is used by default. /// /// This exists because a tokio runtime is heavy, and there should not be many /// independent ones in a process. diff --git a/lib/wasi/src/state/builder.rs b/lib/wasi/src/state/builder.rs index d2ff6cf1f1f..1e5ae200848 100644 --- a/lib/wasi/src/state/builder.rs +++ b/lib/wasi/src/state/builder.rs @@ -15,7 +15,7 @@ use wasmer_wasix_types::wasi::Errno; #[cfg(feature = "sys")] use crate::PluggableRuntime; use crate::{ - bin_factory::{BinFactory, ModuleCache}, + bin_factory::BinFactory, capabilities::Capabilities, fs::{WasiFs, WasiFsRoot, WasiInodes}, os::task::control_plane::{ControlPlaneConfig, ControlPlaneError, WasiControlPlane}, @@ -52,7 +52,6 @@ pub struct WasiEnvBuilder { pub(super) preopens: Vec, /// Pre-opened virtual directories that will be accessible from WASI. vfs_preopens: Vec, - pub(super) compiled_modules: Option>, #[allow(clippy::type_complexity)] pub(super) setup_fs_fn: Option Result<(), String> + Send>>, @@ -547,13 +546,6 @@ impl WasiEnvBuilder { self.runtime = Some(runtime); } - /// Sets the compiled modules to use with this builder (sharing the - /// cached modules is better for performance and memory consumption) - pub fn compiled_modules(mut self, compiled_modules: Arc) -> Self { - self.compiled_modules = Some(compiled_modules); - self - } - pub fn capabilities(mut self, capabilities: Capabilities) -> Self { self.set_capabilities(capabilities); self @@ -701,9 +693,6 @@ impl WasiEnvBuilder { envs, }; - // TODO: this method should not exist - must have unified construction flow! - let module_cache = self.compiled_modules.unwrap_or_default(); - let runtime = self.runtime.unwrap_or_else(|| { #[cfg(feature = "sys-thread")] { @@ -719,7 +708,7 @@ impl WasiEnvBuilder { let uses = self.uses; let map_commands = self.map_commands; - let bin_factory = BinFactory::new(module_cache.clone(), runtime.clone()); + let bin_factory = BinFactory::new(runtime.clone()); let capabilities = self.capabilites; @@ -731,7 +720,6 @@ impl WasiEnvBuilder { let init = WasiEnvInit { state, runtime, - module_cache, webc_dependencies: uses, mapped_commands: map_commands, control_plane, @@ -769,7 +757,7 @@ impl WasiEnvBuilder { } /// Consumes the [`WasiEnvBuilder`] and produces a [`WasiEnvInit`], which - /// can be used to construct a new [`WasiEnv`] with [`WasiEnv::new`]. + /// can be used to construct a new [`WasiEnv`]. /// /// Returns the error from `WasiFs::new` if there's an error // FIXME: use a proper custom error type diff --git a/lib/wasi/src/state/env.rs b/lib/wasi/src/state/env.rs index ed24f00ba0d..883d8d19b20 100644 --- a/lib/wasi/src/state/env.rs +++ b/lib/wasi/src/state/env.rs @@ -16,7 +16,7 @@ use wasmer_wasix_types::{ }; use crate::{ - bin_factory::{BinFactory, ModuleCache}, + bin_factory::BinFactory, capabilities::Capabilities, fs::{WasiFsRoot, WasiInodes}, import_object_for_all_wasi_versions, @@ -203,7 +203,6 @@ unsafe impl Sync for WasiInstanceHandles {} pub struct WasiEnvInit { pub(crate) state: WasiState, pub runtime: Arc, - pub module_cache: Arc, pub webc_dependencies: Vec, pub mapped_commands: HashMap, pub bin_factory: BinFactory, @@ -245,7 +244,6 @@ impl WasiEnvInit { preopen: self.state.preopen.clone(), }, runtime: self.runtime.clone(), - module_cache: self.module_cache.clone(), webc_dependencies: self.webc_dependencies.clone(), mapped_commands: self.mapped_commands.clone(), bin_factory: self.bin_factory.clone(), @@ -286,7 +284,6 @@ pub struct WasiEnv { pub owned_handles: Vec, /// Implementation of the WASI runtime. pub runtime: Arc, - pub module_cache: Arc, pub capabilities: Capabilities, } @@ -332,7 +329,6 @@ impl WasiEnv { inner: self.inner.clone(), owned_handles: self.owned_handles.clone(), runtime: self.runtime.clone(), - module_cache: self.module_cache.clone(), capabilities: self.capabilities.clone(), } } @@ -363,7 +359,6 @@ impl WasiEnv { owned_handles: Vec::new(), runtime: self.runtime.clone(), capabilities: self.capabilities.clone(), - module_cache: self.module_cache.clone(), }; Ok((new_env, handle)) } @@ -402,7 +397,6 @@ impl WasiEnv { owned_handles: Vec::new(), runtime: init.runtime, bin_factory: init.bin_factory, - module_cache: init.module_cache.clone(), capabilities: init.capabilities, }; env.owned_handles.push(thread); @@ -775,10 +769,12 @@ impl WasiEnv { .get("/bin/wasmer") .and_then(|cmd| cmd.as_any().downcast_ref::()); + let tasks = self.runtime.task_manager(); + while let Some(use_package) = use_packages.pop_back() { if let Some(package) = cmd_wasmer .as_ref() - .and_then(|cmd| cmd.get_package(use_package.clone())) + .and_then(|cmd| tasks.block_on(cmd.get_package(use_package.clone()))) { // If its already been added make sure the version is correct let package_name = package.package_name.to_string(); diff --git a/lib/wasi/src/syscalls/wasix/proc_exec.rs b/lib/wasi/src/syscalls/wasix/proc_exec.rs index 6e1160f6c1e..218ccb3a860 100644 --- a/lib/wasi/src/syscalls/wasix/proc_exec.rs +++ b/lib/wasi/src/syscalls/wasix/proc_exec.rs @@ -101,18 +101,13 @@ pub fn proc_exec( let new_store = new_store.take().unwrap(); let env = config.take().unwrap(); - tasks.block_on(async { - let name_inner = name.clone(); - let ret = bin_factory.spawn( - name_inner, - new_store, - env, - ) - .await; + let name_inner = name.clone(); + __asyncify_light(ctx.data(), None, async { + let ret = bin_factory.spawn(name_inner, new_store, env).await; match ret { Ok(ret) => { trace!(%child_pid, "spawned sub-process"); - }, + } Err(err) => { err_exit_code = conv_bus_err_to_exit_code(err); @@ -127,10 +122,13 @@ pub fn proc_exec( &ctx, format!("wasm execute failed [{}] - {}\n", name.as_str(), err) .as_bytes(), - ).await; + ) + .await; } } - }) + + Ok(()) + }); } } }; diff --git a/lib/wasi/src/wapm/mod.rs b/lib/wasi/src/wapm/mod.rs index 4c3e5d5959b..22f2d8cb900 100644 --- a/lib/wasi/src/wapm/mod.rs +++ b/lib/wasi/src/wapm/mod.rs @@ -1,8 +1,9 @@ use anyhow::{bail, Context}; +use once_cell::sync::OnceCell; use std::{ collections::HashMap, path::Path, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, }; use url::Url; use virtual_fs::{FileSystem, WebcVolumeFileSystem}; @@ -284,7 +285,7 @@ fn parse_webc_v2(webc: &Container) -> Result { .unwrap() as u128, ), entry: entry.map(Into::into), - hash: Arc::new(Mutex::new(None)), + hash: OnceCell::new(), webc_fs: Some(Arc::new(webc_fs)), commands: Arc::new(RwLock::new(commands.into_values().collect())), uses,