diff --git a/lib/wasi-types/src/wasi/bindings.rs b/lib/wasi-types/src/wasi/bindings.rs index 9dc9f012bd4..1f31133b322 100644 --- a/lib/wasi-types/src/wasi/bindings.rs +++ b/lib/wasi-types/src/wasi/bindings.rs @@ -1117,52 +1117,7 @@ impl core::fmt::Debug for Tty { .finish() } } -#[repr(u8)] -#[derive(Clone, Copy, PartialEq, Eq)] -pub enum BusDataFormat { - Raw, - Bincode, - MessagePack, - Json, - Yaml, - Xml, - Rkyv, -} -impl core::fmt::Debug for BusDataFormat { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - BusDataFormat::Raw => f.debug_tuple("BusDataFormat::Raw").finish(), - BusDataFormat::Bincode => f.debug_tuple("BusDataFormat::Bincode").finish(), - BusDataFormat::MessagePack => f.debug_tuple("BusDataFormat::MessagePack").finish(), - BusDataFormat::Json => f.debug_tuple("BusDataFormat::Json").finish(), - BusDataFormat::Yaml => f.debug_tuple("BusDataFormat::Yaml").finish(), - BusDataFormat::Xml => f.debug_tuple("BusDataFormat::Xml").finish(), - BusDataFormat::Rkyv => f.debug_tuple("BusDataFormat::Rkyv").finish(), - } - } -} -#[repr(u8)] -#[derive(Clone, Copy, PartialEq, Eq)] -pub enum BusEventType { - Noop, - Exit, - Call, - Result, - Fault, - Close, -} -impl core::fmt::Debug for BusEventType { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - BusEventType::Noop => f.debug_tuple("BusEventType::Noop").finish(), - BusEventType::Exit => f.debug_tuple("BusEventType::Exit").finish(), - BusEventType::Call => f.debug_tuple("BusEventType::Call").finish(), - BusEventType::Result => f.debug_tuple("BusEventType::Result").finish(), - BusEventType::Fault => f.debug_tuple("BusEventType::Fault").finish(), - BusEventType::Close => f.debug_tuple("BusEventType::Close").finish(), - } - } -} + pub type Bid = u32; pub type Cid = u64; #[doc = " __wasi_option_t"] @@ -2389,34 +2344,6 @@ impl core::fmt::Debug for Timeout { } } } -#[repr(C)] -#[derive(Copy, Clone)] -pub struct BusEvent { - pub tag: BusEventType, - pub padding: (u64, u64, u64, u64, u64, u64, u64, u32, u16, u8), -} -impl core::fmt::Debug for BusEvent { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("BusEvent") - .field("tag", &self.tag) - .field("padding", &self.padding) - .finish() - } -} -#[repr(C)] -#[derive(Copy, Clone)] -pub struct BusEvent2 { - pub tag: BusEventType, - pub event: BusEvent, -} -impl core::fmt::Debug for BusEvent2 { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("BusEvent2") - .field("tag", &self.tag) - .field("event", &self.event) - .finish() - } -} // TODO: if necessary, must be implemented in wit-bindgen unsafe impl ValueType for Snapshot0Clockid { @@ -3068,69 +2995,6 @@ unsafe impl ValueType for Tty { fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} } -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusDataFormat { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -unsafe impl wasmer::FromToNativeWasmType for BusDataFormat { - type Native = i32; - - fn to_native(self) -> Self::Native { - self as i32 - } - - fn from_native(n: Self::Native) -> Self { - match n { - 0 => Self::Raw, - 1 => Self::Bincode, - 2 => Self::MessagePack, - 3 => Self::Json, - 4 => Self::Yaml, - 5 => Self::Xml, - 6 => Self::Rkyv, - - q => todo!("could not serialize number {q} to enum BusDataFormat"), - } - } - - fn is_from_store(&self, _store: &impl wasmer::AsStoreRef) -> bool { - false - } -} - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEventType { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -unsafe impl wasmer::FromToNativeWasmType for BusEventType { - type Native = i32; - - fn to_native(self) -> Self::Native { - self as i32 - } - - fn from_native(n: Self::Native) -> Self { - match n { - 0 => Self::Noop, - 1 => Self::Exit, - 2 => Self::Call, - 3 => Self::Result, - 4 => Self::Fault, - 5 => Self::Close, - - q => todo!("could not serialize number {q} to enum BusEventType"), - } - } - - fn is_from_store(&self, _store: &impl wasmer::AsStoreRef) -> bool { - false - } -} - // TODO: if necessary, must be implemented in wit-bindgen unsafe impl ValueType for OptionTag { #[inline] @@ -3683,15 +3547,3 @@ unsafe impl wasmer::FromToNativeWasmType for Timeout { false } } - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEvent { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEvent2 { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} diff --git a/lib/wasi-types/src/wasi/bindings_manual.rs b/lib/wasi-types/src/wasi/bindings_manual.rs index 53651763aa2..387471f1106 100644 --- a/lib/wasi-types/src/wasi/bindings_manual.rs +++ b/lib/wasi-types/src/wasi/bindings_manual.rs @@ -122,12 +122,6 @@ impl From for SubscriptionClock { } } -impl std::fmt::Display for BusDataFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - impl std::fmt::Display for Sockoption { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match *self { diff --git a/lib/wasi/src/bin_factory/exec.rs b/lib/wasi/src/bin_factory/exec.rs index 0e7fcaedb2b..047d1a7a6cf 100644 --- a/lib/wasi/src/bin_factory/exec.rs +++ b/lib/wasi/src/bin_factory/exec.rs @@ -1,22 +1,17 @@ -use std::{ - ops::DerefMut, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; +use std::{pin::Pin, sync::Arc}; -use crate::vbus::{ - BusSpawnedProcess, VirtualBusError, VirtualBusInvokable, VirtualBusProcess, VirtualBusScope, +use crate::{ + os::task::{thread::WasiThreadGuard, TaskJoinHandle}, + VirtualBusError, WasiRuntimeError, }; use futures::Future; -use tokio::sync::mpsc; use tracing::*; use wasmer::{FunctionEnvMut, Instance, Memory, Module, Store}; use wasmer_wasi_types::wasi::{Errno, ExitCode}; use super::{BinFactory, BinaryPackage, ModuleCache}; use crate::{ - import_object_for_all_wasi_versions, runtime::SpawnType, SpawnedMemory, WasiEnv, WasiError, + import_object_for_all_wasi_versions, runtime::SpawnType, SpawnedMemory, WasiEnv, WasiFunctionEnv, WasiRuntime, }; @@ -27,7 +22,7 @@ pub fn spawn_exec( env: WasiEnv, runtime: &Arc, compiled_modules: &ModuleCache, -) -> Result { +) -> Result { // Load the module #[cfg(feature = "sys")] let compiler = store.engine().name(); @@ -69,12 +64,7 @@ pub fn spawn_exec( env.state.fs.conditional_union(&binary); // Now run the module - let mut ret = spawn_exec_module(module, store, env, runtime); - if let Ok(ret) = ret.as_mut() { - ret.module_memory_footprint = binary.module_memory_footprint; - ret.file_system_memory_footprint = binary.file_system_memory_footprint; - } - ret + spawn_exec_module(module, store, env, runtime) } pub fn spawn_exec_module( @@ -82,16 +72,14 @@ pub fn spawn_exec_module( store: Store, env: WasiEnv, runtime: &Arc, -) -> Result { +) -> Result { // Create a new task manager let tasks = runtime.task_manager(); // Create the signaler let pid = env.pid(); - let signaler = Box::new(env.process.clone()); - // Now run the binary - let (exit_code_tx, exit_code_rx) = mpsc::unbounded_channel(); + let join_handle = env.thread.join_handle(); { // Determine if shared memory needs to be created and imported let shared_memory = module.imports().memories().next().map(|a| *a.ty()); @@ -131,6 +119,8 @@ pub fn spawn_exec_module( } }; + let thread = WasiThreadGuard::new(wasi_env.thread.clone()); + let mut wasi_env = WasiFunctionEnv::new(&mut store, wasi_env); // Let's instantiate the module with the imports. @@ -167,19 +157,8 @@ pub fn spawn_exec_module( // If this module exports an _initialize function, run that first. if let Ok(initialize) = instance.exports.get_function("_initialize") { - if let Err(e) = initialize.call(&mut store, &[]) { - let code = match e.downcast::() { - Ok(WasiError::Exit(code)) => code as ExitCode, - Ok(WasiError::UnknownWasiVersion) => { - debug!("wasi[{}]::exec-failed: unknown wasi version", pid); - Errno::Noexec as ExitCode - } - Err(err) => { - debug!("wasi[{}]::exec-failed: runtime error - {}", pid, err); - Errno::Noexec as ExitCode - } - }; - let _ = exit_code_tx.send(code); + if let Err(err) = initialize.call(&mut store, &[]) { + thread.thread.set_status_finished(Err(err.into())); wasi_env .data(&store) .cleanup(Some(Errno::Noexec as ExitCode)); @@ -193,33 +172,29 @@ pub fn spawn_exec_module( // If there is a start function debug!("wasi[{}]::called main()", pid); // TODO: rewrite to use crate::run_wasi_func + + thread.thread.set_status_running(); + let ret = if let Some(start) = start { - match start.call(&mut store, &[]) { - Ok(_) => 0, - Err(e) => match e.downcast::() { - Ok(WasiError::Exit(code)) => code, - Ok(WasiError::UnknownWasiVersion) => { - debug!("wasi[{}]::exec-failed: unknown wasi version", pid); - Errno::Noexec as u32 - } - Err(err) => { - debug!("wasi[{}]::exec-failed: runtime error - {}", pid, err); - 9999u32 - } - }, - } + start + .call(&mut store, &[]) + .map_err(WasiRuntimeError::from) + .map(|_| 0) } else { debug!("wasi[{}]::exec-failed: missing _start function", pid); - Errno::Noexec as u32 + Ok(Errno::Noexec as u32) }; - debug!("wasi[{}]::main() has exited with {}", pid, ret); + debug!("wasi[{pid}]::main() has exited with {ret:?}"); - // Cleanup the environment - wasi_env.data(&store).cleanup(Some(ret)); + let code = if let Err(err) = &ret { + err.as_exit_code().unwrap_or(Errno::Child as u32) + } else { + 0 + }; + thread.thread.set_status_finished(ret); - // Send the result - let _ = exit_code_tx.send(ret); - drop(exit_code_tx); + // Cleanup the environment + wasi_env.data(&store).cleanup(Some(code)); } }; @@ -247,19 +222,7 @@ pub fn spawn_exec_module( })? }; - let inst = Box::new(SpawnedProcess { - exit_code: Mutex::new(None), - exit_code_rx: Mutex::new(exit_code_rx), - }); - Ok(BusSpawnedProcess { - inst, - stdin: None, - stdout: None, - stderr: None, - signaler: Some(signaler), - module_memory_footprint: 0, - file_system_memory_footprint: 0, - }) + Ok(join_handle) } impl BinFactory { @@ -268,7 +231,7 @@ impl BinFactory { name: String, store: Store, env: WasiEnv, - ) -> Pin> + 'a>> { + ) -> Pin> + 'a>> { Box::pin(async move { // Find the binary (or die trying) and make the spawn type let binary = self @@ -298,7 +261,7 @@ impl BinFactory { parent_ctx: Option<&FunctionEnvMut<'_, WasiEnv>>, store: &mut Option, builder: &mut Option, - ) -> Result { + ) -> Result { // We check for built in commands if let Some(parent_ctx) = parent_ctx { if self.commands.exists(name.as_str()) { @@ -312,61 +275,3 @@ impl BinFactory { Err(VirtualBusError::NotFound) } } - -#[derive(Debug)] -pub(crate) struct SpawnedProcess { - pub exit_code: Mutex>, - pub exit_code_rx: Mutex>, -} - -impl VirtualBusProcess for SpawnedProcess { - fn exit_code(&self) -> Option { - let mut exit_code = self.exit_code.lock().unwrap(); - if let Some(exit_code) = exit_code.as_ref() { - return Some(*exit_code); - } - let mut rx = self.exit_code_rx.lock().unwrap(); - match rx.try_recv() { - Ok(code) => { - exit_code.replace(code); - Some(code) - } - Err(mpsc::error::TryRecvError::Disconnected) => { - let code = Errno::Canceled as ExitCode; - exit_code.replace(code); - Some(code) - } - _ => None, - } - } - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - { - let exit_code = self.exit_code.lock().unwrap(); - if exit_code.is_some() { - return Poll::Ready(()); - } - } - let mut rx = self.exit_code_rx.lock().unwrap(); - let mut rx = Pin::new(rx.deref_mut()); - match rx.poll_recv(cx) { - Poll::Ready(code) => { - let code = code.unwrap_or(Errno::Canceled as ExitCode); - { - let mut exit_code = self.exit_code.lock().unwrap(); - exit_code.replace(code); - } - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl VirtualBusScope for SpawnedProcess { - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - VirtualBusProcess::poll_ready(self, cx) - } -} - -impl VirtualBusInvokable for SpawnedProcess {} diff --git a/lib/wasi/src/bin_factory/mod.rs b/lib/wasi/src/bin_factory/mod.rs index a54bfebb002..d39becba4d3 100644 --- a/lib/wasi/src/bin_factory/mod.rs +++ b/lib/wasi/src/bin_factory/mod.rs @@ -10,7 +10,6 @@ mod binary_package; mod exec; mod module_cache; -pub(crate) use exec::SpawnedProcess; use sha2::*; pub use self::{ diff --git a/lib/wasi/src/lib.rs b/lib/wasi/src/lib.rs index 5757a4d4b0e..cb513f3581a 100644 --- a/lib/wasi/src/lib.rs +++ b/lib/wasi/src/lib.rs @@ -44,7 +44,6 @@ pub mod runtime; mod state; mod syscalls; mod utils; -pub mod vbus; pub mod wapm; /// WAI based bindings. @@ -70,7 +69,6 @@ use wasmer::{ MemorySize, RuntimeError, }; -pub use crate::vbus::BusSpawnedProcessJoin; pub use wasmer_vfs; #[deprecated(since = "2.1.0", note = "Please use `wasmer_vfs::FsError`")] pub use wasmer_vfs::FsError as WasiFsError; @@ -154,6 +152,72 @@ impl From for u32 { pub const DEFAULT_STACK_SIZE: u64 = 1_048_576u64; pub const DEFAULT_STACK_BASE: u64 = DEFAULT_STACK_SIZE; +// TODO: remove, this is a leftover from an old vbus crate and should be folded +// into WasiRuntimeError. +#[derive(Error, Copy, Clone, Debug, PartialEq, Eq)] +pub enum VirtualBusError { + /// Failed during serialization + #[error("serialization failed")] + Serialization, + /// Failed during deserialization + #[error("deserialization failed")] + Deserialization, + /// Invalid WAPM process + #[error("invalid wapm")] + InvalidWapm, + /// Failed to fetch the WAPM process + #[error("fetch failed")] + FetchFailed, + /// Failed to compile the WAPM process + #[error("compile error")] + CompileError, + /// Invalid ABI + #[error("WAPM process has an invalid ABI")] + InvalidABI, + /// Call was aborted + #[error("call aborted")] + Aborted, + /// Bad handle + #[error("bad handle")] + BadHandle, + /// Invalid topic + #[error("invalid topic")] + InvalidTopic, + /// Invalid callback + #[error("invalid callback")] + BadCallback, + /// Call is unsupported + #[error("unsupported")] + Unsupported, + /// Not found + #[error("not found")] + NotFound, + /// Bad request + #[error("bad request")] + BadRequest, + /// Access denied + #[error("access denied")] + AccessDenied, + /// Internal error has occured + #[error("internal error")] + InternalError, + /// Memory allocation failed + #[error("memory allocation failed")] + MemoryAllocationFailed, + /// Invocation has failed + #[error("invocation has failed")] + InvokeFailed, + /// Already consumed + #[error("already consumed")] + AlreadyConsumed, + /// Memory access violation + #[error("memory access violation")] + MemoryAccessViolation, + /// Some other unhandled error. If you see this, it's probably a bug. + #[error("unknown error found")] + UnknownError, +} + #[derive(thiserror::Error, Debug)] pub enum WasiRuntimeError { #[error("WASI state setup failed")] diff --git a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs index b897f617ffb..eb184410d13 100644 --- a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs +++ b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs @@ -1,6 +1,9 @@ use std::{any::Any, ops::Deref, sync::Arc}; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; +use crate::{ + os::task::{OwnedTaskStatus, TaskJoinHandle}, + VirtualBusError, +}; use wasmer::{FunctionEnvMut, Store}; use wasmer_wasi_types::wasi::Errno; @@ -59,7 +62,7 @@ impl CmdWasmer { config: &mut Option, what: Option, mut args: Vec, - ) -> Result { + ) -> Result { if let Some(what) = what { let store = store.take().ok_or(VirtualBusError::UnknownError)?; let mut env = config.take().ok_or(VirtualBusError::UnknownError)?; @@ -83,13 +86,15 @@ impl CmdWasmer { ) .await; }); - Ok(BusSpawnedProcess::exited_process(Errno::Noent as u32)) + let handle = OwnedTaskStatus::new_finished_with_code(Errno::Noent as u32).handle(); + Ok(handle) } } else { parent_ctx.data().tasks().block_on(async move { let _ = stderr_write(parent_ctx, HELP_RUN.as_bytes()).await; }); - Ok(BusSpawnedProcess::exited_process(0)) + let handle = OwnedTaskStatus::new_finished_with_code(0).handle(); + Ok(handle) } } @@ -118,7 +123,7 @@ impl VirtualCommand for CmdWasmer { name: &str, store: &mut Option, env: &mut Option, - ) -> Result { + ) -> Result { // Read the command we want to run let env_inner = env.as_ref().ok_or(VirtualBusError::UnknownError)?; let mut args = env_inner.state.args.iter().map(|a| a.as_str()); @@ -136,7 +141,8 @@ impl VirtualCommand for CmdWasmer { parent_ctx.data().tasks().block_on(async move { let _ = stderr_write(parent_ctx, HELP.as_bytes()).await; }); - Ok(BusSpawnedProcess::exited_process(0)) + let handle = OwnedTaskStatus::new_finished_with_code(0).handle(); + Ok(handle) } Some(what) => { let what = Some(what.to_string()); diff --git a/lib/wasi/src/os/command/mod.rs b/lib/wasi/src/os/command/mod.rs index e2a2f0db55a..dbe78f9c125 100644 --- a/lib/wasi/src/os/command/mod.rs +++ b/lib/wasi/src/os/command/mod.rs @@ -2,11 +2,14 @@ pub mod builtins; use std::{collections::HashMap, sync::Arc}; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; use wasmer::{FunctionEnvMut, Store}; use wasmer_wasi_types::wasi::Errno; -use crate::{bin_factory::ModuleCache, syscalls::stderr_write, WasiEnv, WasiRuntime}; +use crate::{ + bin_factory::ModuleCache, syscalls::stderr_write, VirtualBusError, WasiEnv, WasiRuntime, +}; + +use super::task::{OwnedTaskStatus, TaskJoinHandle, TaskStatus}; /// A command available to an OS environment. pub trait VirtualCommand @@ -26,7 +29,7 @@ where path: &str, store: &mut Option, config: &mut Option, - ) -> Result; + ) -> Result; } #[derive(Debug, Clone)] @@ -88,7 +91,7 @@ impl Commands { path: &str, store: &mut Option, builder: &mut Option, - ) -> Result { + ) -> Result { let path = path.to_string(); if let Some(cmd) = self.commands.get(&path) { cmd.exec(parent_ctx, path.as_str(), store, builder) @@ -97,7 +100,9 @@ impl Commands { parent_ctx, format!("wasm command unknown - {}\r\n", path).as_bytes(), ); - Ok(BusSpawnedProcess::exited_process(Errno::Noent as u32)) + + let res = OwnedTaskStatus::new(TaskStatus::Finished(Ok(Errno::Noent as u32))); + Ok(res.handle()) } } } diff --git a/lib/wasi/src/os/console/mod.rs b/lib/wasi/src/os/console/mod.rs index a271e4764e6..9fca12aaca2 100644 --- a/lib/wasi/src/os/console/mod.rs +++ b/lib/wasi/src/os/console/mod.rs @@ -11,7 +11,6 @@ use std::{ sync::{atomic::AtomicBool, Arc, Mutex}, }; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; use derivative::*; use linked_hash_set::LinkedHashSet; use tokio::sync::{mpsc, RwLock}; @@ -25,12 +24,12 @@ use wasmer_vfs::{ }; use wasmer_wasi_types::{types::__WASI_STDIN_FILENO, wasi::BusErrno}; -use super::{cconst::ConsoleConst, common::*}; +use super::{cconst::ConsoleConst, common::*, task::TaskJoinHandle}; use crate::{ bin_factory::{spawn_exec, BinFactory, ModuleCache}, os::task::{control_plane::WasiControlPlane, process::WasiProcess}, state::Capabilities, - VirtualTaskManagerExt, WasiEnv, WasiRuntime, + VirtualBusError, VirtualTaskManagerExt, WasiEnv, WasiRuntime, }; #[derive(Derivative)] @@ -146,7 +145,7 @@ impl Console { self } - pub fn run(&mut self) -> Result<(BusSpawnedProcess, WasiProcess), VirtualBusError> { + pub fn run(&mut self) -> Result<(TaskJoinHandle, WasiProcess), VirtualBusError> { // Extract the program name from the arguments let empty_args: Vec<&[u8]> = Vec::new(); let (webc, prog, args) = match self.boot_cmd.split_once(' ') { @@ -220,7 +219,7 @@ impl Console { .ok(); }); tracing::debug!("failed to get webc dependency - {}", webc); - return Err(crate::vbus::VirtualBusError::NotFound); + return Err(VirtualBusError::NotFound); }; let wasi_process = env.process.clone(); diff --git a/lib/wasi/src/os/task/mod.rs b/lib/wasi/src/os/task/mod.rs index 5d17c9871cf..95fe609c93f 100644 --- a/lib/wasi/src/os/task/mod.rs +++ b/lib/wasi/src/os/task/mod.rs @@ -3,5 +3,9 @@ pub mod control_plane; pub mod process; pub mod signal; -pub mod task_join_handle; +mod task_join_handle; pub mod thread; + +pub use task_join_handle::{ + OwnedTaskStatus, TaskJoinHandle, TaskStatus, TaskTerminatedError, VirtualTaskHandle, +}; diff --git a/lib/wasi/src/os/task/process.rs b/lib/wasi/src/os/task/process.rs index aa9554518b6..829d5cd5ed8 100644 --- a/lib/wasi/src/os/task/process.rs +++ b/lib/wasi/src/os/task/process.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use crate::vbus::{BusSpawnedProcess, SignalHandlerAbi}; +use crate::WasiRuntimeError; use tracing::trace; use wasmer_wasi_types::{ types::Signal, @@ -22,7 +22,11 @@ use crate::{ WasiThread, WasiThreadHandle, WasiThreadId, }; -use super::{control_plane::ControlPlaneError, task_join_handle::TaskJoinHandle}; +use super::{ + control_plane::ControlPlaneError, + signal::{SignalDeliveryError, SignalHandlerAbi}, + task_join_handle::{OwnedTaskStatus, TaskJoinHandle}, +}; /// Represents the ID of a sub-process #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -79,7 +83,7 @@ pub struct WasiProcess { // (we don't want cyclical references) pub(crate) compute: WasiControlPlane, /// Reference to the exit code for the main thread - pub(crate) finished: Arc, + pub(crate) finished: Arc, /// List of all the children spawned from this thread pub(crate) children: Arc>>, /// Number of threads waiting for children to exit @@ -104,11 +108,12 @@ pub struct WasiProcessInner { /// Signals that will be triggered at specific intervals pub signal_intervals: HashMap, /// Represents all the process spun up as a bus process - pub bus_processes: HashMap>, + pub bus_processes: HashMap, /// Indicates if the bus process can be reused pub bus_process_reuse: HashMap, WasiProcessId>, } +// TODO: why do we need this, how is it used? pub(crate) struct WasiProcessWait { waiting: Arc, } @@ -146,7 +151,7 @@ impl WasiProcess { bus_process_reuse: Default::default(), })), children: Arc::new(RwLock::new(Default::default())), - finished: Arc::new(TaskJoinHandle::new()), + finished: Arc::new(OwnedTaskStatus::default()), waiting: Arc::new(AtomicU32::new(0)), } } @@ -189,7 +194,7 @@ impl WasiProcess { is_main = true; self.finished.clone() } else { - Arc::new(TaskJoinHandle::new()) + Arc::new(OwnedTaskStatus::default()) }; let ctrl = WasiThread::new(self.pid(), id, is_main, finished, task_count_guard); @@ -277,19 +282,19 @@ impl WasiProcess { inner.thread_count } - /// Waits until the process is finished or the timeout is reached - pub async fn join(&self) -> Option { + /// Waits until the process is finished. + pub async fn join(&self) -> Result> { let _guard = WasiProcessWait::new(self); self.finished.await_termination().await } /// Attempts to join on the process - pub fn try_join(&self) -> Option { - self.finished.get_exit_code() + pub fn try_join(&self) -> Option>> { + self.finished.status().into_finished() } /// Waits for all the children to be finished - pub async fn join_children(&mut self) -> Option { + pub async fn join_children(&mut self) -> Option>> { let _guard = WasiProcessWait::new(self); let children: Vec<_> = { let children = self.children.read().unwrap(); @@ -313,47 +318,48 @@ impl WasiProcess { futures::future::join_all(waits.into_iter()) .await .into_iter() - .find_map(|a| a) + .next() } /// Waits for any of the children to finished pub async fn join_any_child(&mut self) -> Result, Errno> { let _guard = WasiProcessWait::new(self); - loop { - let children: Vec<_> = { - let children = self.children.read().unwrap(); - children.clone() - }; - if children.is_empty() { - return Err(Errno::Child); - } + let children: Vec<_> = { + let children = self.children.read().unwrap(); + children.clone() + }; + if children.is_empty() { + return Err(Errno::Child); + } - let mut waits = Vec::new(); - for pid in children { - if let Some(process) = self.compute.get_process(pid) { - let children = self.children.clone(); - waits.push(async move { - let join = process.join().await; - let mut children = children.write().unwrap(); - children.retain(|a| *a != pid); - join.map(|exit_code| (pid, exit_code)) - }) - } - } - let woke = futures::future::select_all(waits.into_iter().map(|a| Box::pin(a))) - .await - .0; - if let Some((pid, exit_code)) = woke { - return Ok(Some((pid, exit_code))); + let mut waits = Vec::new(); + for pid in children { + if let Some(process) = self.compute.get_process(pid) { + let children = self.children.clone(); + waits.push(async move { + let join = process.join().await; + let mut children = children.write().unwrap(); + children.retain(|a| *a != pid); + (pid, join) + }) } } + let (pid, res) = futures::future::select_all(waits.into_iter().map(|a| Box::pin(a))) + .await + .0; + + let code = res.unwrap_or_else(|e| e.as_exit_code().unwrap_or(Errno::Canceled as u32)); + + Ok(Some((pid, code))) } /// Terminate the process and all its threads pub fn terminate(&self, exit_code: ExitCode) { + // FIXME: this is wrong, threads might still be running! + // Need special logic for the main thread. let guard = self.inner.read().unwrap(); for thread in guard.threads.values() { - thread.terminate(exit_code) + thread.set_status_finished(Ok(exit_code)) } } @@ -364,9 +370,12 @@ impl WasiProcess { } impl SignalHandlerAbi for WasiProcess { - fn signal(&self, sig: u8) { + fn signal(&self, sig: u8) -> Result<(), SignalDeliveryError> { if let Ok(sig) = sig.try_into() { self.signal_process(sig); + Ok(()) + } else { + Err(SignalDeliveryError) } } } diff --git a/lib/wasi/src/os/task/signal.rs b/lib/wasi/src/os/task/signal.rs index d4da66557e1..1ed12bd62a3 100644 --- a/lib/wasi/src/os/task/signal.rs +++ b/lib/wasi/src/os/task/signal.rs @@ -2,6 +2,19 @@ use std::time::Duration; use wasmer_wasi_types::types::Signal; +#[derive(thiserror::Error, Debug)] +#[error("Signal could not be delivered")] +pub struct SignalDeliveryError; + +/// Signal handles...well...they process signals +pub trait SignalHandlerAbi +where + Self: std::fmt::Debug, +{ + /// Processes a signal + fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError>; +} + #[derive(Debug)] pub struct WasiSignalInterval { /// Signal that will be raised diff --git a/lib/wasi/src/os/task/task_join_handle.rs b/lib/wasi/src/os/task/task_join_handle.rs index 83168237952..be56cadb9e8 100644 --- a/lib/wasi/src/os/task/task_join_handle.rs +++ b/lib/wasi/src/os/task/task_join_handle.rs @@ -1,58 +1,174 @@ -use std::sync::Mutex; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; -use wasmer_wasi_types::wasi::ExitCode; +use wasmer_wasi_types::wasi::{Errno, ExitCode}; + +use crate::WasiRuntimeError; + +#[derive(Clone, Debug)] +pub enum TaskStatus { + Pending, + Running, + Finished(Result>), +} + +impl TaskStatus { + /// Returns `true` if the task status is [`Pending`]. + /// + /// [`Pending`]: TaskStatus::Pending + #[must_use] + pub fn is_pending(&self) -> bool { + matches!(self, Self::Pending) + } + + /// Returns `true` if the task status is [`Running`]. + /// + /// [`Running`]: TaskStatus::Running + #[must_use] + pub fn is_running(&self) -> bool { + matches!(self, Self::Running) + } + + pub fn into_finished(self) -> Option>> { + match self { + Self::Finished(res) => Some(res), + _ => None, + } + } +} + +#[derive(thiserror::Error, Debug)] +#[error("Task already terminated")] +pub struct TaskTerminatedError; + +pub trait VirtualTaskHandle: std::fmt::Debug + Send + Sync + 'static { + fn status(&self) -> TaskStatus; + + /// Polls to check if the process is ready yet to receive commands + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; + + fn poll_finished( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>>; +} /// A handle that allows awaiting the termination of a task, and retrieving its exit code. #[derive(Debug)] -pub struct TaskJoinHandle { - exit_code: Mutex>, - sender: tokio::sync::broadcast::Sender<()>, +pub struct OwnedTaskStatus { + watch: tokio::sync::watch::Sender, } -impl TaskJoinHandle { - pub fn new() -> Self { - let (sender, _) = tokio::sync::broadcast::channel(1); +impl OwnedTaskStatus { + pub fn new(status: TaskStatus) -> Self { Self { - exit_code: Mutex::new(None), - sender, + watch: tokio::sync::watch::channel(status).0, } } + pub fn new_finished_with_code(code: ExitCode) -> Self { + Self::new(TaskStatus::Finished(Ok(code))) + } + /// Marks the task as finished. - pub(super) fn terminate(&self, exit_code: u32) { - let mut lock = self.exit_code.lock().unwrap(); - if lock.is_none() { - *lock = Some(exit_code); - std::mem::drop(lock); - self.sender.send(()).ok(); - } + pub fn set_running(&self) { + self.watch.send_modify(|value| { + // Only set to running if task was pending, otherwise the transition would be invalid. + if value.is_pending() { + *value = TaskStatus::Running; + } + }) + } + + /// Marks the task as finished. + pub(super) fn set_finished(&self, res: Result>) { + let inner = match res { + Ok(code) => Ok(code), + Err(err) => { + if let Some(code) = err.as_exit_code() { + Ok(code) + } else { + Err(err) + } + } + }; + + self.watch.send(TaskStatus::Finished(inner)).ok(); } - pub async fn await_termination(&self) -> Option { - // FIXME: why is this a loop? should not be necessary, - // Should be redundant since the subscriber is created while holding the lock. + pub fn status(&self) -> TaskStatus { + self.watch.borrow().clone() + } + + pub async fn await_termination(&self) -> Result> { + let mut receiver = self.watch.subscribe(); + match &*receiver.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } loop { - let mut rx = { - let code_opt = self.exit_code.lock().unwrap(); - if code_opt.is_some() { - return *code_opt; + // NOTE: unwrap() is fine, because &self always holds on to the sender. + receiver.changed().await.unwrap(); + match &*receiver.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); } - self.sender.subscribe() - }; - if rx.recv().await.is_err() { - return None; } } } - /// Returns the exit code if the task has finished, and None otherwise. - pub fn get_exit_code(&self) -> Option { - *self.exit_code.lock().unwrap() + pub fn handle(&self) -> TaskJoinHandle { + TaskJoinHandle { + watch: self.watch.subscribe(), + } } } -impl Default for TaskJoinHandle { +impl Default for OwnedTaskStatus { fn default() -> Self { - Self::new() + Self::new(TaskStatus::Pending) + } +} + +/// A handle that allows awaiting the termination of a task, and retrieving its exit code. +#[derive(Clone, Debug)] +pub struct TaskJoinHandle { + watch: tokio::sync::watch::Receiver, +} + +impl TaskJoinHandle { + /// Retrieve the current status. + pub fn status(&self) -> TaskStatus { + self.watch.borrow().clone() + } + + /// Wait until the task finishes. + pub async fn wait_finished(&mut self) -> Result> { + match &*self.watch.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } + loop { + if self.watch.changed().await.is_err() { + return Ok(Errno::Noent as u32); + } + match &*self.watch.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } + } } } diff --git a/lib/wasi/src/os/task/thread.rs b/lib/wasi/src/os/task/thread.rs index d43cd3b771b..a57e2af6867 100644 --- a/lib/wasi/src/os/task/thread.rs +++ b/lib/wasi/src/os/task/thread.rs @@ -11,9 +11,15 @@ use wasmer_wasi_types::{ wasi::{Errno, ExitCode}, }; -use crate::os::task::process::{WasiProcessId, WasiProcessInner}; +use crate::{ + os::task::process::{WasiProcessId, WasiProcessInner}, + WasiRuntimeError, +}; -use super::{control_plane::TaskCountGuard, task_join_handle::TaskJoinHandle}; +use super::{ + control_plane::TaskCountGuard, + task_join_handle::{OwnedTaskStatus, TaskJoinHandle}, +}; /// Represents the ID of a WASI thread #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -84,6 +90,31 @@ pub struct WasiThread { state: Arc, } +/// A guard that ensures a thread is marked as terminated when dropped. +/// +/// Normally the thread result should be manually registered with +/// [`Thread::set_status_running`] or [`Thread::set_status_finished`], but +/// this guard can ensure that the thread is marked as terminated even if this +/// is forgotten or a panic occurs. +pub struct WasiThreadGuard { + pub thread: WasiThread, +} + +impl WasiThreadGuard { + pub fn new(thread: WasiThread) -> Self { + Self { thread } + } +} + +impl Drop for WasiThreadGuard { + fn drop(&mut self) { + self.thread + .set_status_finished(Err( + crate::RuntimeError::new("Thread manager disconnected").into() + )); + } +} + #[derive(Debug)] struct WasiThreadState { is_main: bool, @@ -91,7 +122,7 @@ struct WasiThreadState { id: WasiThreadId, signals: Mutex<(Vec, Vec)>, stack: Mutex, - finished: Arc, + status: Arc, // Registers the task termination with the ControlPlane on drop. // Never accessed, since it's a drop guard. @@ -105,7 +136,7 @@ impl WasiThread { pid: WasiProcessId, id: WasiThreadId, is_main: bool, - finished: Arc, + status: Arc, guard: TaskCountGuard, ) -> Self { Self { @@ -113,7 +144,7 @@ impl WasiThread { is_main, pid, id, - finished, + status, signals: Mutex::new((Vec::new(), Vec::new())), stack: Mutex::new(ThreadStack::default()), _task_count_guard: guard, @@ -136,25 +167,34 @@ impl WasiThread { self.state.is_main } + /// Get a join handle to watch the task status. + pub fn join_handle(&self) -> TaskJoinHandle { + self.state.status.handle() + } + // TODO: this should be private, access should go through utility methods. pub fn signals(&self) -> &Mutex<(Vec, Vec)> { &self.state.signals } + pub fn set_status_running(&self) { + self.state.status.set_running(); + } + /// Marks the thread as finished (which will cause anyone that /// joined on it to wake up) - pub fn terminate(&self, exit_code: u32) { - self.state.finished.terminate(exit_code); + pub fn set_status_finished(&self, res: Result) { + self.state.status.set_finished(res.map_err(Arc::new)); } /// Waits until the thread is finished or the timeout is reached - pub async fn join(&self) -> Option { - self.state.finished.await_termination().await + pub async fn join(&self) -> Result> { + self.state.status.await_termination().await } /// Attempts to join on the thread - pub fn try_join(&self) -> Option { - self.state.finished.get_exit_code() + pub fn try_join(&self) -> Option>> { + self.state.status.status().into_finished() } /// Adds a signal for this thread to process @@ -362,7 +402,7 @@ impl Drop for WasiThreadHandle { if let Some(id) = Arc::get_mut(&mut self.id) { let mut inner = self.inner.write().unwrap(); if let Some(ctrl) = inner.threads.remove(id) { - ctrl.terminate(0); + ctrl.set_status_finished(Ok(0)); } inner.thread_count -= 1; } diff --git a/lib/wasi/src/os/tty/mod.rs b/lib/wasi/src/os/tty/mod.rs index c32502886e8..02e29da0edb 100644 --- a/lib/wasi/src/os/tty/mod.rs +++ b/lib/wasi/src/os/tty/mod.rs @@ -3,7 +3,6 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::vbus::SignalHandlerAbi; use derivative::*; use futures::future::BoxFuture; use wasmer_vfs::{AsyncWriteExt, NullFile, VirtualFile}; @@ -11,6 +10,8 @@ use wasmer_wasi_types::wasi::{Signal, Snapshot0Clockid}; use crate::syscalls::platform_clock_time_get; +use super::task::signal::SignalHandlerAbi; + const TTY_MOBILE_PAUSE: u128 = std::time::Duration::from_millis(200).as_nanos(); #[cfg(feature = "host-termios")] @@ -229,7 +230,7 @@ impl Tty { fn on_ctrl_c(mut self, _data: Cow<'static, [u8]>) -> BoxFuture<'static, Self> { Box::pin(async move { if let Some(signaler) = self.signaler.as_ref() { - signaler.signal(Signal::Sigint as u8); + signaler.signal(Signal::Sigint as u8).ok(); let (echo, _line_buffering) = { let options = self.options.inner.lock().unwrap(); diff --git a/lib/wasi/src/state/builder.rs b/lib/wasi/src/state/builder.rs index cf62e6ad99b..4957fb5ea99 100644 --- a/lib/wasi/src/state/builder.rs +++ b/lib/wasi/src/state/builder.rs @@ -752,6 +752,7 @@ impl WasiEnvBuilder { let start = instance.exports.get_function("_start")?; + env.data(store).thread.set_status_running(); let res = crate::run_wasi_func_start(start, store); let exit_code = match &res { diff --git a/lib/wasi/src/state/env.rs b/lib/wasi/src/state/env.rs index 60d50a7a30e..ffb989d0d77 100644 --- a/lib/wasi/src/state/env.rs +++ b/lib/wasi/src/state/env.rs @@ -533,7 +533,7 @@ impl WasiEnv { let signal_cnt = signals.len(); for sig in signals { if sig == Signal::Sigint || sig == Signal::Sigquit || sig == Signal::Sigkill { - env.thread.terminate(Errno::Intr as u32); + env.thread.set_status_finished(Ok(Errno::Intr as u32)); return Err(WasiError::Exit(Errno::Intr as u32)); } else { trace!("wasi[{}]::signal-ignored: {:?}", env.pid(), sig); @@ -562,7 +562,7 @@ impl WasiEnv { .thread .has_signal(&[Signal::Sigint, Signal::Sigquit, Signal::Sigkill]) { - env.thread.terminate(Errno::Intr as u32); + env.thread.set_status_finished(Ok(Errno::Intr as u32)); } return Ok(Ok(false)); } @@ -649,10 +649,10 @@ impl WasiEnv { pub fn should_exit(&self) -> Option { // Check for forced exit if let Some(forced_exit) = self.thread.try_join() { - return Some(forced_exit); + return Some(forced_exit.unwrap_or(Errno::Child as u32)); } if let Some(forced_exit) = self.process.try_join() { - return Some(forced_exit); + return Some(forced_exit.unwrap_or(Errno::Child as u32)); } None } diff --git a/lib/wasi/src/state/mod.rs b/lib/wasi/src/state/mod.rs index fb407ca99af..0fca7b274ea 100644 --- a/lib/wasi/src/state/mod.rs +++ b/lib/wasi/src/state/mod.rs @@ -30,7 +30,6 @@ use std::{ time::Duration, }; -use crate::vbus::VirtualBusInvocation; use derivative::Derivative; #[cfg(feature = "enable-serde")] use serde::{Deserialize, Serialize}; @@ -48,7 +47,6 @@ pub use self::{ pub use crate::fs::{InodeGuard, InodeWeakGuard}; use crate::{ fs::{fs_error_into_wasi_err, WasiFs, WasiFsRoot, WasiInodes, WasiStateFileGuard}, - os::task::process::WasiProcessId, syscalls::types::*, utils::WasiParkingLot, WasiCallingId, @@ -107,12 +105,6 @@ pub struct WasiFutex { pub(crate) wakers: Vec, } -#[derive(Debug)] -pub struct WasiBusCall { - pub bid: WasiProcessId, - pub invocation: Box, -} - /// Structure that holds the state of BUS calls to this process and from /// this process. BUS calls are the equivalent of RPC's with support /// for all the major serializers diff --git a/lib/wasi/src/state/types.rs b/lib/wasi/src/state/types.rs index 381e4654416..7a0fe98f48d 100644 --- a/lib/wasi/src/state/types.rs +++ b/lib/wasi/src/state/types.rs @@ -5,9 +5,10 @@ use cfg_if::cfg_if; #[cfg(feature = "enable-serde")] use serde::{Deserialize, Serialize}; -use crate::vbus::VirtualBusError; use wasmer_wasi_types::wasi::{BusErrno, Rights}; +use crate::VirtualBusError; + cfg_if! { if #[cfg(feature = "host-fs")] { pub use wasmer_vfs::host_fs::{Stderr, Stdin, Stdout}; diff --git a/lib/wasi/src/syscalls/mod.rs b/lib/wasi/src/syscalls/mod.rs index ee2ef1165cb..1aa18162409 100644 --- a/lib/wasi/src/syscalls/mod.rs +++ b/lib/wasi/src/syscalls/mod.rs @@ -62,10 +62,6 @@ pub use unix::*; #[cfg(any(target_family = "wasm"))] pub use wasm::*; -pub(crate) use crate::vbus::{ - BusInvocationEvent, BusSpawnedProcess, SignalHandlerAbi, StdioMode, VirtualBusError, - VirtualBusInvokedWait, -}; pub(crate) use wasmer::{ AsStoreMut, AsStoreRef, Extern, Function, FunctionEnv, FunctionEnvMut, Global, Instance, Memory, Memory32, Memory64, MemoryAccessError, MemoryError, MemorySize, MemoryView, Module, @@ -81,10 +77,10 @@ pub use windows::*; pub(crate) use self::types::{ wasi::{ - Addressfamily, Advice, Bid, BusDataFormat, BusErrno, BusHandles, Cid, Clockid, Dircookie, - Dirent, Errno, Event, EventFdReadwrite, Eventrwflags, Eventtype, ExitCode, Fd as WasiFd, - Fdflags, Fdstat, Filesize, Filestat, Filetype, Fstflags, Linkcount, Longsize, OptionFd, - Pid, Prestat, Rights, Snapshot0Clockid, Sockoption, Sockstatus, Socktype, StackSnapshot, + Addressfamily, Advice, Bid, BusErrno, BusHandles, Cid, Clockid, Dircookie, Dirent, Errno, + Event, EventFdReadwrite, Eventrwflags, Eventtype, ExitCode, Fd as WasiFd, Fdflags, Fdstat, + Filesize, Filestat, Filetype, Fstflags, Linkcount, Longsize, OptionFd, Pid, Prestat, + Rights, Snapshot0Clockid, Sockoption, Sockstatus, Socktype, StackSnapshot, StdioMode as WasiStdioMode, Streamsecurity, Subscription, SubscriptionFsReadwrite, Tid, Timestamp, TlKey, TlUser, TlVal, Tty, Whence, }, @@ -106,7 +102,7 @@ pub(crate) use crate::{ runtime::{task_manager::VirtualTaskManagerExt, SpawnType}, state::{ self, bus_errno_into_vbus_error, iterate_poll_events, vbus_error_into_bus_errno, - InodeGuard, InodeWeakGuard, PollEvent, PollEventBuilder, WasiBusCall, WasiFutex, WasiState, + InodeGuard, InodeWeakGuard, PollEvent, PollEventBuilder, WasiFutex, WasiState, WasiThreadContext, }, utils::{self, map_io_err}, @@ -119,7 +115,7 @@ use crate::{ MAX_SYMLINKS, }, utils::store::InstanceSnapshot, - WasiInodes, + VirtualBusError, WasiInodes, }; pub(crate) use crate::{net::net_error_into_wasi_err, utils::WasiParkingLot}; @@ -1101,12 +1097,3 @@ pub(crate) fn conv_bus_err_to_exit_code(err: VirtualBusError) -> ExitCode { _ => Errno::Inval as ExitCode, } } - -// Function for converting the format -pub(crate) fn conv_bus_format(format: BusDataFormat) -> BusDataFormat { - format -} - -pub(crate) fn conv_bus_format_from(format: BusDataFormat) -> BusDataFormat { - format -} diff --git a/lib/wasi/src/syscalls/wasi/proc_exit.rs b/lib/wasi/src/syscalls/wasi/proc_exit.rs index ce5cbe89c37..83869eacb76 100644 --- a/lib/wasi/src/syscalls/wasi/proc_exit.rs +++ b/lib/wasi/src/syscalls/wasi/proc_exit.rs @@ -20,7 +20,7 @@ pub fn proc_exit( ); // Set the exit code for this process - ctx.data().thread.terminate(code as u32); + ctx.data().thread.set_status_finished(Ok(code as u32)); // If we are in a vfork we need to return to the point we left off if let Some(mut vfork) = ctx.data_mut().vfork.take() { diff --git a/lib/wasi/src/syscalls/wasix/proc_exec.rs b/lib/wasi/src/syscalls/wasix/proc_exec.rs index 5eef36e748d..704bb8e1e18 100644 --- a/lib/wasi/src/syscalls/wasix/proc_exec.rs +++ b/lib/wasi/src/syscalls/wasix/proc_exec.rs @@ -1,5 +1,8 @@ use super::*; -use crate::syscalls::*; +use crate::{ + os::task::{OwnedTaskStatus, TaskStatus}, + syscalls::*, +}; /// Replaces the current process with a new process /// @@ -161,14 +164,14 @@ pub fn proc_exec( ctx.data().tid(), err_exit_code ); - BusSpawnedProcess::exited_process(err_exit_code) + OwnedTaskStatus::new(TaskStatus::Finished(Ok(err_exit_code))).handle() } }; // Add the process to the environment state { let mut inner = ctx.data().process.write(); - inner.bus_processes.insert(child_pid, Box::new(process)); + inner.bus_processes.insert(child_pid, process); } let mut memory_stack = vfork.memory_stack; @@ -270,13 +273,8 @@ pub fn proc_exec( let (tx, rx) = std::sync::mpsc::channel(); let tasks_inner = tasks.clone(); tasks.block_on(Box::pin(async move { - loop { - tasks_inner.sleep_now(Duration::from_millis(5)).await; - if let Some(exit_code) = process.inst.exit_code() { - tx.send(exit_code).unwrap(); - break; - } - } + let code = process.wait_finished().await.unwrap_or(Errno::Child as u32); + tx.send(code); })); let exit_code = rx.recv().unwrap(); OnCalledAction::Trap(Box::new(WasiError::Exit(exit_code as ExitCode))) diff --git a/lib/wasi/src/syscalls/wasix/proc_fork.rs b/lib/wasi/src/syscalls/wasix/proc_fork.rs index ac4bdf3dfd5..22ba49086df 100644 --- a/lib/wasi/src/syscalls/wasix/proc_fork.rs +++ b/lib/wasi/src/syscalls/wasix/proc_fork.rs @@ -1,5 +1,5 @@ use super::*; -use crate::syscalls::*; +use crate::{os::task::OwnedTaskStatus, syscalls::*}; #[cfg(feature = "sys")] use wasmer::vm::VMMemory; @@ -331,18 +331,9 @@ pub fn proc_fork( }; // Add the process to the environment state - let process = BusSpawnedProcess { - inst: Box::new(crate::bin_factory::SpawnedProcess { - exit_code: Mutex::new(None), - exit_code_rx: Mutex::new(exit_code_rx), - }), - stdin: None, - stdout: None, - stderr: None, - signaler: Some(signaler), - module_memory_footprint: 0, - file_system_memory_footprint: 0, - }; + + let process = OwnedTaskStatus::default(); + { trace!( "wasi[{}:{}]::spawned sub-process (pid={})", @@ -351,7 +342,7 @@ pub fn proc_fork( child_pid.raw() ); let mut inner = ctx.data().process.write(); - inner.bus_processes.insert(child_pid, Box::new(process)); + inner.bus_processes.insert(child_pid, process.handle()); } // If the return value offset is within the memory stack then we need diff --git a/lib/wasi/src/syscalls/wasix/proc_join.rs b/lib/wasi/src/syscalls/wasix/proc_join.rs index e886ba2b3a2..f55839be17d 100644 --- a/lib/wasi/src/syscalls/wasix/proc_join.rs +++ b/lib/wasi/src/syscalls/wasix/proc_join.rs @@ -66,7 +66,8 @@ pub fn proc_join( let process = env.process.control_plane().get_process(pid); if let Some(process) = process { let exit_code = wasi_try_ok!(__asyncify(&mut ctx, None, async move { - process.join().await.ok_or(Errno::Child) + let code = process.join().await.unwrap_or(Errno::Child as u32); + Ok(code) })?); trace!("child ({}) exited with {}", pid.raw(), exit_code); diff --git a/lib/wasi/src/syscalls/wasix/proc_spawn.rs b/lib/wasi/src/syscalls/wasix/proc_spawn.rs index c679c179328..84bfa4d0b0a 100644 --- a/lib/wasi/src/syscalls/wasix/proc_spawn.rs +++ b/lib/wasi/src/syscalls/wasix/proc_spawn.rs @@ -267,7 +267,7 @@ pub fn proc_spawn_internal( { let mut guard = env.process.write(); - guard.bus_processes.insert(child_pid, Box::new(process)); + guard.bus_processes.insert(child_pid, process); }; let handles = BusHandles { diff --git a/lib/wasi/src/vbus.rs b/lib/wasi/src/vbus.rs deleted file mode 100644 index 932c20ccca3..00000000000 --- a/lib/wasi/src/vbus.rs +++ /dev/null @@ -1,400 +0,0 @@ -use std::fmt; -use std::future::Future; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; -use thiserror::Error; - -pub use wasmer_vfs::StdioMode; -use wasmer_vfs::VirtualFile; -use wasmer_wasi_types::wasi::{BusDataFormat, ExitCode}; - -enum BusSpawnedProcessJoinResult { - Active(Box), - Finished(Option), -} - -#[derive(Clone)] -pub struct BusSpawnedProcessJoin { - inst: Arc>, -} - -impl BusSpawnedProcessJoin { - pub fn new(process: BusSpawnedProcess) -> Self { - Self { - inst: Arc::new(Mutex::new(BusSpawnedProcessJoinResult::Active( - process.inst, - ))), - } - } - - pub fn poll_finished(&self, cx: &mut Context<'_>) -> Poll> { - let mut guard = self.inst.lock().unwrap(); - match guard.deref_mut() { - BusSpawnedProcessJoinResult::Active(inst) => { - let pinned_inst = Pin::new(inst.as_mut()); - match pinned_inst.poll_ready(cx) { - Poll::Ready(_) => { - let exit_code = inst.exit_code(); - let mut swap = BusSpawnedProcessJoinResult::Finished(exit_code); - std::mem::swap(guard.deref_mut(), &mut swap); - Poll::Ready(exit_code) - } - Poll::Pending => Poll::Pending, - } - } - BusSpawnedProcessJoinResult::Finished(exit_code) => Poll::Ready(*exit_code), - } - } -} - -impl Future for BusSpawnedProcessJoin { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll_finished(cx) - } -} - -impl std::fmt::Debug for BusSpawnedProcessJoin { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BusSpawnedProcessJoin").finish() - } -} - -/// Signal handles...well...they process signals -pub trait SignalHandlerAbi -where - Self: std::fmt::Debug, -{ - /// Processes a signal - fn signal(&self, sig: u8); -} - -#[derive(Debug)] -pub struct BusSpawnedProcess { - /// Reference to the spawned instance - pub inst: Box, - /// Virtual file used for stdin - pub stdin: Option>, - /// Virtual file used for stdout - pub stdout: Option>, - /// Virtual file used for stderr - pub stderr: Option>, - /// The signal handler for this process (if any) - pub signaler: Option>, - /// Amount of memory that the module uses - pub module_memory_footprint: u64, - /// Combined memory uses by the module and the file system - pub file_system_memory_footprint: u64, -} - -impl BusSpawnedProcess { - pub fn exited_process(exit_code: ExitCode) -> Self { - Self { - inst: Box::new(ExitedProcess { exit_code }), - stdin: None, - stdout: None, - stderr: None, - signaler: None, - module_memory_footprint: 0, - file_system_memory_footprint: 0, - } - } -} - -pub trait VirtualBusScope: fmt::Debug + Send + Sync + 'static { - //// Returns true if the invokable target has finished - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>; -} - -pub trait VirtualBusInvokable: fmt::Debug + Send + Sync + 'static { - /// Invokes a service within this instance - #[allow(unused_variables)] - fn invoke( - &self, - topic_hash: u128, - format: BusDataFormat, - buf: Vec, - ) -> Box { - Box::new(UnsupportedBusInvoker::default()) - } -} - -#[derive(Debug, Default)] -struct UnsupportedBusInvoker {} - -impl VirtualBusInvoked for UnsupportedBusInvoker { - #[allow(unused_variables)] - fn poll_invoked( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>> { - Poll::Ready(Err(VirtualBusError::Unsupported)) - } -} - -pub trait VirtualBusInvoked: fmt::Debug + Unpin + 'static { - //// Returns once the bus has been invoked (or failed) - fn poll_invoked( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>>; -} -pub struct VirtualBusInvokedWait { - invoked: Box, -} -impl VirtualBusInvokedWait { - pub fn new(invoked: Box) -> Self { - Self { invoked } - } -} -impl Future for VirtualBusInvokedWait { - type Output = Result, VirtualBusError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let invoked = Pin::new(self.invoked.deref_mut()); - invoked.poll_invoked(cx) - } -} - -pub trait VirtualBusProcess: - VirtualBusScope + VirtualBusInvokable + fmt::Debug + Send + Sync + 'static -{ - /// Returns the exit code if the instance has finished - fn exit_code(&self) -> Option; - - /// Polls to check if the process is ready yet to receive commands - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>; -} - -pub trait VirtualBusInvocation: - VirtualBusInvokable + fmt::Debug + Send + Sync + Unpin + 'static -{ - /// Polls for new listen events related to this context - fn poll_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; -} - -#[derive(Debug)] -pub struct InstantInvocation { - val: Option, - err: Option, - call: Option>, -} - -impl InstantInvocation { - pub fn response(format: BusDataFormat, data: Vec) -> Self { - Self { - val: Some(BusInvocationEvent::Response { format, data }), - err: None, - call: None, - } - } - - pub fn fault(err: VirtualBusError) -> Self { - Self { - val: None, - err: Some(err), - call: None, - } - } - - pub fn call(val: Box) -> Self { - Self { - val: None, - err: None, - call: Some(val), - } - } -} - -impl VirtualBusInvoked for InstantInvocation { - fn poll_invoked( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>> { - if let Some(err) = self.err.take() { - return Poll::Ready(Err(err)); - } - if let Some(val) = self.val.take() { - return Poll::Ready(Ok(Box::new(InstantInvocation { - val: Some(val), - err: None, - call: None, - }))); - } - match self.call.take() { - Some(val) => Poll::Ready(Ok(val)), - None => Poll::Ready(Err(VirtualBusError::AlreadyConsumed)), - } - } -} - -impl VirtualBusInvocation for InstantInvocation { - fn poll_event(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - match self.val.take() { - Some(val) => Poll::Ready(val), - None => Poll::Ready(BusInvocationEvent::Fault { - fault: VirtualBusError::AlreadyConsumed, - }), - } - } -} - -impl VirtualBusInvokable for InstantInvocation { - fn invoke( - &self, - _topic_hash: u128, - _format: BusDataFormat, - _buf: Vec, - ) -> Box { - Box::new(InstantInvocation { - val: None, - err: Some(VirtualBusError::InvalidTopic), - call: None, - }) - } -} - -#[derive(Debug)] -pub enum BusInvocationEvent { - /// The server has sent some out-of-band data to you - Callback { - /// Topic that this call relates to - topic_hash: u128, - /// Format of the data we received - format: BusDataFormat, - /// Data passed in the call - data: Vec, - }, - /// The service has a responded to your call - Response { - /// Format of the data we received - format: BusDataFormat, - /// Data returned by the call - data: Vec, - }, - /// The service has responded with a fault - Fault { - /// Fault code that was raised - fault: VirtualBusError, - }, -} - -pub trait VirtualBusListener: fmt::Debug + Send + Sync + Unpin + 'static { - /// Polls for new calls to this service - fn poll(self: Pin<&Self>, cx: &mut Context<'_>) -> Poll; -} - -#[derive(Debug)] -pub struct BusCallEvent { - /// Topic hash that this call relates to - pub topic_hash: u128, - /// Reference to the call itself - pub called: Box, - /// Format of the data we received - pub format: BusDataFormat, - /// Data passed in the call - pub data: Vec, -} - -pub trait VirtualBusCalled: fmt::Debug + Send + Sync + 'static { - /// Polls for new calls to this service - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; - - /// Sends an out-of-band message back to the caller - fn callback(&self, topic_hash: u128, format: BusDataFormat, buf: Vec); - - /// Informs the caller that their call has failed - fn fault(self: Box, fault: VirtualBusError); - - /// Finishes the call and returns a particular response - fn reply(&self, format: BusDataFormat, buf: Vec); -} - -#[derive(Error, Copy, Clone, Debug, PartialEq, Eq)] -pub enum VirtualBusError { - /// Failed during serialization - #[error("serialization failed")] - Serialization, - /// Failed during deserialization - #[error("deserialization failed")] - Deserialization, - /// Invalid WAPM process - #[error("invalid wapm")] - InvalidWapm, - /// Failed to fetch the WAPM process - #[error("fetch failed")] - FetchFailed, - /// Failed to compile the WAPM process - #[error("compile error")] - CompileError, - /// Invalid ABI - #[error("WAPM process has an invalid ABI")] - InvalidABI, - /// Call was aborted - #[error("call aborted")] - Aborted, - /// Bad handle - #[error("bad handle")] - BadHandle, - /// Invalid topic - #[error("invalid topic")] - InvalidTopic, - /// Invalid callback - #[error("invalid callback")] - BadCallback, - /// Call is unsupported - #[error("unsupported")] - Unsupported, - /// Not found - #[error("not found")] - NotFound, - /// Bad request - #[error("bad request")] - BadRequest, - /// Access denied - #[error("access denied")] - AccessDenied, - /// Internal error has occured - #[error("internal error")] - InternalError, - /// Memory allocation failed - #[error("memory allocation failed")] - MemoryAllocationFailed, - /// Invocation has failed - #[error("invocation has failed")] - InvokeFailed, - /// Already consumed - #[error("already consumed")] - AlreadyConsumed, - /// Memory access violation - #[error("memory access violation")] - MemoryAccessViolation, - /// Some other unhandled error. If you see this, it's probably a bug. - #[error("unknown error found")] - UnknownError, -} - -#[derive(Debug)] -pub struct ExitedProcess { - pub exit_code: ExitCode, -} - -impl VirtualBusProcess for ExitedProcess { - fn exit_code(&self) -> Option { - Some(self.exit_code) - } - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { - Poll::Ready(()) - } -} - -impl VirtualBusScope for ExitedProcess { - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - VirtualBusProcess::poll_ready(self, cx) - } -} - -impl VirtualBusInvokable for ExitedProcess {} diff --git a/lib/wasi/tests/stdio.rs b/lib/wasi/tests/stdio.rs index d7899f68e72..e1480720330 100644 --- a/lib/wasi/tests/stdio.rs +++ b/lib/wasi/tests/stdio.rs @@ -43,7 +43,7 @@ mod sys { async fn test_stdout() { let mut store = Store::default(); - let module = Module::new(&mut store, br#" + let module = Module::new(&store, br#" (module ;; Import the required fd_write WASI function which will write the given io vectors to stdout ;; The function signature for fd_write is: