From bc2a2dd4c26511cd959b7e4a0e26aa88adb25a67 Mon Sep 17 00:00:00 2001 From: Christoph Herzog Date: Sat, 25 Feb 2023 08:39:43 +0100 Subject: [PATCH] wasi: Remove vbus leftovers and improve task joining * A new TaskStatus is introduced that specifies the current state of a task (usually a process or thread) * Status tracking is refactored to use tokio::sync::watch instead of a homegrown solution which combined channels and mutexes * The task status now not just tracks an exit code, but also the ocurred error This helps consumers of the task status to know that and error ocurred, and which one, which helps debugging / logging and potential recovery * Remove the unused VBUS crate leftover types and traits Almost all gone, except for the VirtualBusError * Move SignalHandlerAbi types to os::signal --- lib/wasi-types/src/wasi/bindings.rs | 150 +------ lib/wasi-types/src/wasi/bindings_manual.rs | 6 - lib/wasi/src/bin_factory/exec.rs | 161 ++----- lib/wasi/src/bin_factory/mod.rs | 1 - lib/wasi/src/lib.rs | 68 ++- .../src/os/command/builtins/cmd_wasmer.rs | 18 +- lib/wasi/src/os/command/mod.rs | 15 +- lib/wasi/src/os/console/mod.rs | 9 +- lib/wasi/src/os/task/mod.rs | 6 +- lib/wasi/src/os/task/process.rs | 87 ++-- lib/wasi/src/os/task/signal.rs | 13 + lib/wasi/src/os/task/task_join_handle.rs | 182 ++++++-- lib/wasi/src/os/task/thread.rs | 64 ++- lib/wasi/src/os/tty/mod.rs | 5 +- lib/wasi/src/state/builder.rs | 1 + lib/wasi/src/state/env.rs | 8 +- lib/wasi/src/state/mod.rs | 8 - lib/wasi/src/state/types.rs | 3 +- lib/wasi/src/syscalls/mod.rs | 25 +- lib/wasi/src/syscalls/wasi/proc_exit.rs | 2 +- lib/wasi/src/syscalls/wasix/proc_exec.rs | 18 +- lib/wasi/src/syscalls/wasix/proc_fork.rs | 19 +- lib/wasi/src/syscalls/wasix/proc_join.rs | 3 +- lib/wasi/src/syscalls/wasix/proc_spawn.rs | 2 +- lib/wasi/src/vbus.rs | 400 ------------------ lib/wasi/tests/stdio.rs | 2 +- 26 files changed, 427 insertions(+), 849 deletions(-) delete mode 100644 lib/wasi/src/vbus.rs diff --git a/lib/wasi-types/src/wasi/bindings.rs b/lib/wasi-types/src/wasi/bindings.rs index 9dc9f012bd4..1f31133b322 100644 --- a/lib/wasi-types/src/wasi/bindings.rs +++ b/lib/wasi-types/src/wasi/bindings.rs @@ -1117,52 +1117,7 @@ impl core::fmt::Debug for Tty { .finish() } } -#[repr(u8)] -#[derive(Clone, Copy, PartialEq, Eq)] -pub enum BusDataFormat { - Raw, - Bincode, - MessagePack, - Json, - Yaml, - Xml, - Rkyv, -} -impl core::fmt::Debug for BusDataFormat { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - BusDataFormat::Raw => f.debug_tuple("BusDataFormat::Raw").finish(), - BusDataFormat::Bincode => f.debug_tuple("BusDataFormat::Bincode").finish(), - BusDataFormat::MessagePack => f.debug_tuple("BusDataFormat::MessagePack").finish(), - BusDataFormat::Json => f.debug_tuple("BusDataFormat::Json").finish(), - BusDataFormat::Yaml => f.debug_tuple("BusDataFormat::Yaml").finish(), - BusDataFormat::Xml => f.debug_tuple("BusDataFormat::Xml").finish(), - BusDataFormat::Rkyv => f.debug_tuple("BusDataFormat::Rkyv").finish(), - } - } -} -#[repr(u8)] -#[derive(Clone, Copy, PartialEq, Eq)] -pub enum BusEventType { - Noop, - Exit, - Call, - Result, - Fault, - Close, -} -impl core::fmt::Debug for BusEventType { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - match self { - BusEventType::Noop => f.debug_tuple("BusEventType::Noop").finish(), - BusEventType::Exit => f.debug_tuple("BusEventType::Exit").finish(), - BusEventType::Call => f.debug_tuple("BusEventType::Call").finish(), - BusEventType::Result => f.debug_tuple("BusEventType::Result").finish(), - BusEventType::Fault => f.debug_tuple("BusEventType::Fault").finish(), - BusEventType::Close => f.debug_tuple("BusEventType::Close").finish(), - } - } -} + pub type Bid = u32; pub type Cid = u64; #[doc = " __wasi_option_t"] @@ -2389,34 +2344,6 @@ impl core::fmt::Debug for Timeout { } } } -#[repr(C)] -#[derive(Copy, Clone)] -pub struct BusEvent { - pub tag: BusEventType, - pub padding: (u64, u64, u64, u64, u64, u64, u64, u32, u16, u8), -} -impl core::fmt::Debug for BusEvent { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("BusEvent") - .field("tag", &self.tag) - .field("padding", &self.padding) - .finish() - } -} -#[repr(C)] -#[derive(Copy, Clone)] -pub struct BusEvent2 { - pub tag: BusEventType, - pub event: BusEvent, -} -impl core::fmt::Debug for BusEvent2 { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - f.debug_struct("BusEvent2") - .field("tag", &self.tag) - .field("event", &self.event) - .finish() - } -} // TODO: if necessary, must be implemented in wit-bindgen unsafe impl ValueType for Snapshot0Clockid { @@ -3068,69 +2995,6 @@ unsafe impl ValueType for Tty { fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} } -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusDataFormat { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -unsafe impl wasmer::FromToNativeWasmType for BusDataFormat { - type Native = i32; - - fn to_native(self) -> Self::Native { - self as i32 - } - - fn from_native(n: Self::Native) -> Self { - match n { - 0 => Self::Raw, - 1 => Self::Bincode, - 2 => Self::MessagePack, - 3 => Self::Json, - 4 => Self::Yaml, - 5 => Self::Xml, - 6 => Self::Rkyv, - - q => todo!("could not serialize number {q} to enum BusDataFormat"), - } - } - - fn is_from_store(&self, _store: &impl wasmer::AsStoreRef) -> bool { - false - } -} - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEventType { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -unsafe impl wasmer::FromToNativeWasmType for BusEventType { - type Native = i32; - - fn to_native(self) -> Self::Native { - self as i32 - } - - fn from_native(n: Self::Native) -> Self { - match n { - 0 => Self::Noop, - 1 => Self::Exit, - 2 => Self::Call, - 3 => Self::Result, - 4 => Self::Fault, - 5 => Self::Close, - - q => todo!("could not serialize number {q} to enum BusEventType"), - } - } - - fn is_from_store(&self, _store: &impl wasmer::AsStoreRef) -> bool { - false - } -} - // TODO: if necessary, must be implemented in wit-bindgen unsafe impl ValueType for OptionTag { #[inline] @@ -3683,15 +3547,3 @@ unsafe impl wasmer::FromToNativeWasmType for Timeout { false } } - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEvent { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} - -// TODO: if necessary, must be implemented in wit-bindgen -unsafe impl ValueType for BusEvent2 { - #[inline] - fn zero_padding_bytes(&self, _bytes: &mut [MaybeUninit]) {} -} diff --git a/lib/wasi-types/src/wasi/bindings_manual.rs b/lib/wasi-types/src/wasi/bindings_manual.rs index 53651763aa2..387471f1106 100644 --- a/lib/wasi-types/src/wasi/bindings_manual.rs +++ b/lib/wasi-types/src/wasi/bindings_manual.rs @@ -122,12 +122,6 @@ impl From for SubscriptionClock { } } -impl std::fmt::Display for BusDataFormat { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - impl std::fmt::Display for Sockoption { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let s = match *self { diff --git a/lib/wasi/src/bin_factory/exec.rs b/lib/wasi/src/bin_factory/exec.rs index 0e7fcaedb2b..047d1a7a6cf 100644 --- a/lib/wasi/src/bin_factory/exec.rs +++ b/lib/wasi/src/bin_factory/exec.rs @@ -1,22 +1,17 @@ -use std::{ - ops::DerefMut, - pin::Pin, - sync::{Arc, Mutex}, - task::{Context, Poll}, -}; +use std::{pin::Pin, sync::Arc}; -use crate::vbus::{ - BusSpawnedProcess, VirtualBusError, VirtualBusInvokable, VirtualBusProcess, VirtualBusScope, +use crate::{ + os::task::{thread::WasiThreadGuard, TaskJoinHandle}, + VirtualBusError, WasiRuntimeError, }; use futures::Future; -use tokio::sync::mpsc; use tracing::*; use wasmer::{FunctionEnvMut, Instance, Memory, Module, Store}; use wasmer_wasi_types::wasi::{Errno, ExitCode}; use super::{BinFactory, BinaryPackage, ModuleCache}; use crate::{ - import_object_for_all_wasi_versions, runtime::SpawnType, SpawnedMemory, WasiEnv, WasiError, + import_object_for_all_wasi_versions, runtime::SpawnType, SpawnedMemory, WasiEnv, WasiFunctionEnv, WasiRuntime, }; @@ -27,7 +22,7 @@ pub fn spawn_exec( env: WasiEnv, runtime: &Arc, compiled_modules: &ModuleCache, -) -> Result { +) -> Result { // Load the module #[cfg(feature = "sys")] let compiler = store.engine().name(); @@ -69,12 +64,7 @@ pub fn spawn_exec( env.state.fs.conditional_union(&binary); // Now run the module - let mut ret = spawn_exec_module(module, store, env, runtime); - if let Ok(ret) = ret.as_mut() { - ret.module_memory_footprint = binary.module_memory_footprint; - ret.file_system_memory_footprint = binary.file_system_memory_footprint; - } - ret + spawn_exec_module(module, store, env, runtime) } pub fn spawn_exec_module( @@ -82,16 +72,14 @@ pub fn spawn_exec_module( store: Store, env: WasiEnv, runtime: &Arc, -) -> Result { +) -> Result { // Create a new task manager let tasks = runtime.task_manager(); // Create the signaler let pid = env.pid(); - let signaler = Box::new(env.process.clone()); - // Now run the binary - let (exit_code_tx, exit_code_rx) = mpsc::unbounded_channel(); + let join_handle = env.thread.join_handle(); { // Determine if shared memory needs to be created and imported let shared_memory = module.imports().memories().next().map(|a| *a.ty()); @@ -131,6 +119,8 @@ pub fn spawn_exec_module( } }; + let thread = WasiThreadGuard::new(wasi_env.thread.clone()); + let mut wasi_env = WasiFunctionEnv::new(&mut store, wasi_env); // Let's instantiate the module with the imports. @@ -167,19 +157,8 @@ pub fn spawn_exec_module( // If this module exports an _initialize function, run that first. if let Ok(initialize) = instance.exports.get_function("_initialize") { - if let Err(e) = initialize.call(&mut store, &[]) { - let code = match e.downcast::() { - Ok(WasiError::Exit(code)) => code as ExitCode, - Ok(WasiError::UnknownWasiVersion) => { - debug!("wasi[{}]::exec-failed: unknown wasi version", pid); - Errno::Noexec as ExitCode - } - Err(err) => { - debug!("wasi[{}]::exec-failed: runtime error - {}", pid, err); - Errno::Noexec as ExitCode - } - }; - let _ = exit_code_tx.send(code); + if let Err(err) = initialize.call(&mut store, &[]) { + thread.thread.set_status_finished(Err(err.into())); wasi_env .data(&store) .cleanup(Some(Errno::Noexec as ExitCode)); @@ -193,33 +172,29 @@ pub fn spawn_exec_module( // If there is a start function debug!("wasi[{}]::called main()", pid); // TODO: rewrite to use crate::run_wasi_func + + thread.thread.set_status_running(); + let ret = if let Some(start) = start { - match start.call(&mut store, &[]) { - Ok(_) => 0, - Err(e) => match e.downcast::() { - Ok(WasiError::Exit(code)) => code, - Ok(WasiError::UnknownWasiVersion) => { - debug!("wasi[{}]::exec-failed: unknown wasi version", pid); - Errno::Noexec as u32 - } - Err(err) => { - debug!("wasi[{}]::exec-failed: runtime error - {}", pid, err); - 9999u32 - } - }, - } + start + .call(&mut store, &[]) + .map_err(WasiRuntimeError::from) + .map(|_| 0) } else { debug!("wasi[{}]::exec-failed: missing _start function", pid); - Errno::Noexec as u32 + Ok(Errno::Noexec as u32) }; - debug!("wasi[{}]::main() has exited with {}", pid, ret); + debug!("wasi[{pid}]::main() has exited with {ret:?}"); - // Cleanup the environment - wasi_env.data(&store).cleanup(Some(ret)); + let code = if let Err(err) = &ret { + err.as_exit_code().unwrap_or(Errno::Child as u32) + } else { + 0 + }; + thread.thread.set_status_finished(ret); - // Send the result - let _ = exit_code_tx.send(ret); - drop(exit_code_tx); + // Cleanup the environment + wasi_env.data(&store).cleanup(Some(code)); } }; @@ -247,19 +222,7 @@ pub fn spawn_exec_module( })? }; - let inst = Box::new(SpawnedProcess { - exit_code: Mutex::new(None), - exit_code_rx: Mutex::new(exit_code_rx), - }); - Ok(BusSpawnedProcess { - inst, - stdin: None, - stdout: None, - stderr: None, - signaler: Some(signaler), - module_memory_footprint: 0, - file_system_memory_footprint: 0, - }) + Ok(join_handle) } impl BinFactory { @@ -268,7 +231,7 @@ impl BinFactory { name: String, store: Store, env: WasiEnv, - ) -> Pin> + 'a>> { + ) -> Pin> + 'a>> { Box::pin(async move { // Find the binary (or die trying) and make the spawn type let binary = self @@ -298,7 +261,7 @@ impl BinFactory { parent_ctx: Option<&FunctionEnvMut<'_, WasiEnv>>, store: &mut Option, builder: &mut Option, - ) -> Result { + ) -> Result { // We check for built in commands if let Some(parent_ctx) = parent_ctx { if self.commands.exists(name.as_str()) { @@ -312,61 +275,3 @@ impl BinFactory { Err(VirtualBusError::NotFound) } } - -#[derive(Debug)] -pub(crate) struct SpawnedProcess { - pub exit_code: Mutex>, - pub exit_code_rx: Mutex>, -} - -impl VirtualBusProcess for SpawnedProcess { - fn exit_code(&self) -> Option { - let mut exit_code = self.exit_code.lock().unwrap(); - if let Some(exit_code) = exit_code.as_ref() { - return Some(*exit_code); - } - let mut rx = self.exit_code_rx.lock().unwrap(); - match rx.try_recv() { - Ok(code) => { - exit_code.replace(code); - Some(code) - } - Err(mpsc::error::TryRecvError::Disconnected) => { - let code = Errno::Canceled as ExitCode; - exit_code.replace(code); - Some(code) - } - _ => None, - } - } - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - { - let exit_code = self.exit_code.lock().unwrap(); - if exit_code.is_some() { - return Poll::Ready(()); - } - } - let mut rx = self.exit_code_rx.lock().unwrap(); - let mut rx = Pin::new(rx.deref_mut()); - match rx.poll_recv(cx) { - Poll::Ready(code) => { - let code = code.unwrap_or(Errno::Canceled as ExitCode); - { - let mut exit_code = self.exit_code.lock().unwrap(); - exit_code.replace(code); - } - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl VirtualBusScope for SpawnedProcess { - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - VirtualBusProcess::poll_ready(self, cx) - } -} - -impl VirtualBusInvokable for SpawnedProcess {} diff --git a/lib/wasi/src/bin_factory/mod.rs b/lib/wasi/src/bin_factory/mod.rs index a54bfebb002..d39becba4d3 100644 --- a/lib/wasi/src/bin_factory/mod.rs +++ b/lib/wasi/src/bin_factory/mod.rs @@ -10,7 +10,6 @@ mod binary_package; mod exec; mod module_cache; -pub(crate) use exec::SpawnedProcess; use sha2::*; pub use self::{ diff --git a/lib/wasi/src/lib.rs b/lib/wasi/src/lib.rs index 5757a4d4b0e..cb513f3581a 100644 --- a/lib/wasi/src/lib.rs +++ b/lib/wasi/src/lib.rs @@ -44,7 +44,6 @@ pub mod runtime; mod state; mod syscalls; mod utils; -pub mod vbus; pub mod wapm; /// WAI based bindings. @@ -70,7 +69,6 @@ use wasmer::{ MemorySize, RuntimeError, }; -pub use crate::vbus::BusSpawnedProcessJoin; pub use wasmer_vfs; #[deprecated(since = "2.1.0", note = "Please use `wasmer_vfs::FsError`")] pub use wasmer_vfs::FsError as WasiFsError; @@ -154,6 +152,72 @@ impl From for u32 { pub const DEFAULT_STACK_SIZE: u64 = 1_048_576u64; pub const DEFAULT_STACK_BASE: u64 = DEFAULT_STACK_SIZE; +// TODO: remove, this is a leftover from an old vbus crate and should be folded +// into WasiRuntimeError. +#[derive(Error, Copy, Clone, Debug, PartialEq, Eq)] +pub enum VirtualBusError { + /// Failed during serialization + #[error("serialization failed")] + Serialization, + /// Failed during deserialization + #[error("deserialization failed")] + Deserialization, + /// Invalid WAPM process + #[error("invalid wapm")] + InvalidWapm, + /// Failed to fetch the WAPM process + #[error("fetch failed")] + FetchFailed, + /// Failed to compile the WAPM process + #[error("compile error")] + CompileError, + /// Invalid ABI + #[error("WAPM process has an invalid ABI")] + InvalidABI, + /// Call was aborted + #[error("call aborted")] + Aborted, + /// Bad handle + #[error("bad handle")] + BadHandle, + /// Invalid topic + #[error("invalid topic")] + InvalidTopic, + /// Invalid callback + #[error("invalid callback")] + BadCallback, + /// Call is unsupported + #[error("unsupported")] + Unsupported, + /// Not found + #[error("not found")] + NotFound, + /// Bad request + #[error("bad request")] + BadRequest, + /// Access denied + #[error("access denied")] + AccessDenied, + /// Internal error has occured + #[error("internal error")] + InternalError, + /// Memory allocation failed + #[error("memory allocation failed")] + MemoryAllocationFailed, + /// Invocation has failed + #[error("invocation has failed")] + InvokeFailed, + /// Already consumed + #[error("already consumed")] + AlreadyConsumed, + /// Memory access violation + #[error("memory access violation")] + MemoryAccessViolation, + /// Some other unhandled error. If you see this, it's probably a bug. + #[error("unknown error found")] + UnknownError, +} + #[derive(thiserror::Error, Debug)] pub enum WasiRuntimeError { #[error("WASI state setup failed")] diff --git a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs index b897f617ffb..eb184410d13 100644 --- a/lib/wasi/src/os/command/builtins/cmd_wasmer.rs +++ b/lib/wasi/src/os/command/builtins/cmd_wasmer.rs @@ -1,6 +1,9 @@ use std::{any::Any, ops::Deref, sync::Arc}; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; +use crate::{ + os::task::{OwnedTaskStatus, TaskJoinHandle}, + VirtualBusError, +}; use wasmer::{FunctionEnvMut, Store}; use wasmer_wasi_types::wasi::Errno; @@ -59,7 +62,7 @@ impl CmdWasmer { config: &mut Option, what: Option, mut args: Vec, - ) -> Result { + ) -> Result { if let Some(what) = what { let store = store.take().ok_or(VirtualBusError::UnknownError)?; let mut env = config.take().ok_or(VirtualBusError::UnknownError)?; @@ -83,13 +86,15 @@ impl CmdWasmer { ) .await; }); - Ok(BusSpawnedProcess::exited_process(Errno::Noent as u32)) + let handle = OwnedTaskStatus::new_finished_with_code(Errno::Noent as u32).handle(); + Ok(handle) } } else { parent_ctx.data().tasks().block_on(async move { let _ = stderr_write(parent_ctx, HELP_RUN.as_bytes()).await; }); - Ok(BusSpawnedProcess::exited_process(0)) + let handle = OwnedTaskStatus::new_finished_with_code(0).handle(); + Ok(handle) } } @@ -118,7 +123,7 @@ impl VirtualCommand for CmdWasmer { name: &str, store: &mut Option, env: &mut Option, - ) -> Result { + ) -> Result { // Read the command we want to run let env_inner = env.as_ref().ok_or(VirtualBusError::UnknownError)?; let mut args = env_inner.state.args.iter().map(|a| a.as_str()); @@ -136,7 +141,8 @@ impl VirtualCommand for CmdWasmer { parent_ctx.data().tasks().block_on(async move { let _ = stderr_write(parent_ctx, HELP.as_bytes()).await; }); - Ok(BusSpawnedProcess::exited_process(0)) + let handle = OwnedTaskStatus::new_finished_with_code(0).handle(); + Ok(handle) } Some(what) => { let what = Some(what.to_string()); diff --git a/lib/wasi/src/os/command/mod.rs b/lib/wasi/src/os/command/mod.rs index e2a2f0db55a..dbe78f9c125 100644 --- a/lib/wasi/src/os/command/mod.rs +++ b/lib/wasi/src/os/command/mod.rs @@ -2,11 +2,14 @@ pub mod builtins; use std::{collections::HashMap, sync::Arc}; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; use wasmer::{FunctionEnvMut, Store}; use wasmer_wasi_types::wasi::Errno; -use crate::{bin_factory::ModuleCache, syscalls::stderr_write, WasiEnv, WasiRuntime}; +use crate::{ + bin_factory::ModuleCache, syscalls::stderr_write, VirtualBusError, WasiEnv, WasiRuntime, +}; + +use super::task::{OwnedTaskStatus, TaskJoinHandle, TaskStatus}; /// A command available to an OS environment. pub trait VirtualCommand @@ -26,7 +29,7 @@ where path: &str, store: &mut Option, config: &mut Option, - ) -> Result; + ) -> Result; } #[derive(Debug, Clone)] @@ -88,7 +91,7 @@ impl Commands { path: &str, store: &mut Option, builder: &mut Option, - ) -> Result { + ) -> Result { let path = path.to_string(); if let Some(cmd) = self.commands.get(&path) { cmd.exec(parent_ctx, path.as_str(), store, builder) @@ -97,7 +100,9 @@ impl Commands { parent_ctx, format!("wasm command unknown - {}\r\n", path).as_bytes(), ); - Ok(BusSpawnedProcess::exited_process(Errno::Noent as u32)) + + let res = OwnedTaskStatus::new(TaskStatus::Finished(Ok(Errno::Noent as u32))); + Ok(res.handle()) } } } diff --git a/lib/wasi/src/os/console/mod.rs b/lib/wasi/src/os/console/mod.rs index a271e4764e6..9fca12aaca2 100644 --- a/lib/wasi/src/os/console/mod.rs +++ b/lib/wasi/src/os/console/mod.rs @@ -11,7 +11,6 @@ use std::{ sync::{atomic::AtomicBool, Arc, Mutex}, }; -use crate::vbus::{BusSpawnedProcess, VirtualBusError}; use derivative::*; use linked_hash_set::LinkedHashSet; use tokio::sync::{mpsc, RwLock}; @@ -25,12 +24,12 @@ use wasmer_vfs::{ }; use wasmer_wasi_types::{types::__WASI_STDIN_FILENO, wasi::BusErrno}; -use super::{cconst::ConsoleConst, common::*}; +use super::{cconst::ConsoleConst, common::*, task::TaskJoinHandle}; use crate::{ bin_factory::{spawn_exec, BinFactory, ModuleCache}, os::task::{control_plane::WasiControlPlane, process::WasiProcess}, state::Capabilities, - VirtualTaskManagerExt, WasiEnv, WasiRuntime, + VirtualBusError, VirtualTaskManagerExt, WasiEnv, WasiRuntime, }; #[derive(Derivative)] @@ -146,7 +145,7 @@ impl Console { self } - pub fn run(&mut self) -> Result<(BusSpawnedProcess, WasiProcess), VirtualBusError> { + pub fn run(&mut self) -> Result<(TaskJoinHandle, WasiProcess), VirtualBusError> { // Extract the program name from the arguments let empty_args: Vec<&[u8]> = Vec::new(); let (webc, prog, args) = match self.boot_cmd.split_once(' ') { @@ -220,7 +219,7 @@ impl Console { .ok(); }); tracing::debug!("failed to get webc dependency - {}", webc); - return Err(crate::vbus::VirtualBusError::NotFound); + return Err(VirtualBusError::NotFound); }; let wasi_process = env.process.clone(); diff --git a/lib/wasi/src/os/task/mod.rs b/lib/wasi/src/os/task/mod.rs index 5d17c9871cf..95fe609c93f 100644 --- a/lib/wasi/src/os/task/mod.rs +++ b/lib/wasi/src/os/task/mod.rs @@ -3,5 +3,9 @@ pub mod control_plane; pub mod process; pub mod signal; -pub mod task_join_handle; +mod task_join_handle; pub mod thread; + +pub use task_join_handle::{ + OwnedTaskStatus, TaskJoinHandle, TaskStatus, TaskTerminatedError, VirtualTaskHandle, +}; diff --git a/lib/wasi/src/os/task/process.rs b/lib/wasi/src/os/task/process.rs index aa9554518b6..829d5cd5ed8 100644 --- a/lib/wasi/src/os/task/process.rs +++ b/lib/wasi/src/os/task/process.rs @@ -9,7 +9,7 @@ use std::{ time::Duration, }; -use crate::vbus::{BusSpawnedProcess, SignalHandlerAbi}; +use crate::WasiRuntimeError; use tracing::trace; use wasmer_wasi_types::{ types::Signal, @@ -22,7 +22,11 @@ use crate::{ WasiThread, WasiThreadHandle, WasiThreadId, }; -use super::{control_plane::ControlPlaneError, task_join_handle::TaskJoinHandle}; +use super::{ + control_plane::ControlPlaneError, + signal::{SignalDeliveryError, SignalHandlerAbi}, + task_join_handle::{OwnedTaskStatus, TaskJoinHandle}, +}; /// Represents the ID of a sub-process #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -79,7 +83,7 @@ pub struct WasiProcess { // (we don't want cyclical references) pub(crate) compute: WasiControlPlane, /// Reference to the exit code for the main thread - pub(crate) finished: Arc, + pub(crate) finished: Arc, /// List of all the children spawned from this thread pub(crate) children: Arc>>, /// Number of threads waiting for children to exit @@ -104,11 +108,12 @@ pub struct WasiProcessInner { /// Signals that will be triggered at specific intervals pub signal_intervals: HashMap, /// Represents all the process spun up as a bus process - pub bus_processes: HashMap>, + pub bus_processes: HashMap, /// Indicates if the bus process can be reused pub bus_process_reuse: HashMap, WasiProcessId>, } +// TODO: why do we need this, how is it used? pub(crate) struct WasiProcessWait { waiting: Arc, } @@ -146,7 +151,7 @@ impl WasiProcess { bus_process_reuse: Default::default(), })), children: Arc::new(RwLock::new(Default::default())), - finished: Arc::new(TaskJoinHandle::new()), + finished: Arc::new(OwnedTaskStatus::default()), waiting: Arc::new(AtomicU32::new(0)), } } @@ -189,7 +194,7 @@ impl WasiProcess { is_main = true; self.finished.clone() } else { - Arc::new(TaskJoinHandle::new()) + Arc::new(OwnedTaskStatus::default()) }; let ctrl = WasiThread::new(self.pid(), id, is_main, finished, task_count_guard); @@ -277,19 +282,19 @@ impl WasiProcess { inner.thread_count } - /// Waits until the process is finished or the timeout is reached - pub async fn join(&self) -> Option { + /// Waits until the process is finished. + pub async fn join(&self) -> Result> { let _guard = WasiProcessWait::new(self); self.finished.await_termination().await } /// Attempts to join on the process - pub fn try_join(&self) -> Option { - self.finished.get_exit_code() + pub fn try_join(&self) -> Option>> { + self.finished.status().into_finished() } /// Waits for all the children to be finished - pub async fn join_children(&mut self) -> Option { + pub async fn join_children(&mut self) -> Option>> { let _guard = WasiProcessWait::new(self); let children: Vec<_> = { let children = self.children.read().unwrap(); @@ -313,47 +318,48 @@ impl WasiProcess { futures::future::join_all(waits.into_iter()) .await .into_iter() - .find_map(|a| a) + .next() } /// Waits for any of the children to finished pub async fn join_any_child(&mut self) -> Result, Errno> { let _guard = WasiProcessWait::new(self); - loop { - let children: Vec<_> = { - let children = self.children.read().unwrap(); - children.clone() - }; - if children.is_empty() { - return Err(Errno::Child); - } + let children: Vec<_> = { + let children = self.children.read().unwrap(); + children.clone() + }; + if children.is_empty() { + return Err(Errno::Child); + } - let mut waits = Vec::new(); - for pid in children { - if let Some(process) = self.compute.get_process(pid) { - let children = self.children.clone(); - waits.push(async move { - let join = process.join().await; - let mut children = children.write().unwrap(); - children.retain(|a| *a != pid); - join.map(|exit_code| (pid, exit_code)) - }) - } - } - let woke = futures::future::select_all(waits.into_iter().map(|a| Box::pin(a))) - .await - .0; - if let Some((pid, exit_code)) = woke { - return Ok(Some((pid, exit_code))); + let mut waits = Vec::new(); + for pid in children { + if let Some(process) = self.compute.get_process(pid) { + let children = self.children.clone(); + waits.push(async move { + let join = process.join().await; + let mut children = children.write().unwrap(); + children.retain(|a| *a != pid); + (pid, join) + }) } } + let (pid, res) = futures::future::select_all(waits.into_iter().map(|a| Box::pin(a))) + .await + .0; + + let code = res.unwrap_or_else(|e| e.as_exit_code().unwrap_or(Errno::Canceled as u32)); + + Ok(Some((pid, code))) } /// Terminate the process and all its threads pub fn terminate(&self, exit_code: ExitCode) { + // FIXME: this is wrong, threads might still be running! + // Need special logic for the main thread. let guard = self.inner.read().unwrap(); for thread in guard.threads.values() { - thread.terminate(exit_code) + thread.set_status_finished(Ok(exit_code)) } } @@ -364,9 +370,12 @@ impl WasiProcess { } impl SignalHandlerAbi for WasiProcess { - fn signal(&self, sig: u8) { + fn signal(&self, sig: u8) -> Result<(), SignalDeliveryError> { if let Ok(sig) = sig.try_into() { self.signal_process(sig); + Ok(()) + } else { + Err(SignalDeliveryError) } } } diff --git a/lib/wasi/src/os/task/signal.rs b/lib/wasi/src/os/task/signal.rs index d4da66557e1..1ed12bd62a3 100644 --- a/lib/wasi/src/os/task/signal.rs +++ b/lib/wasi/src/os/task/signal.rs @@ -2,6 +2,19 @@ use std::time::Duration; use wasmer_wasi_types::types::Signal; +#[derive(thiserror::Error, Debug)] +#[error("Signal could not be delivered")] +pub struct SignalDeliveryError; + +/// Signal handles...well...they process signals +pub trait SignalHandlerAbi +where + Self: std::fmt::Debug, +{ + /// Processes a signal + fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError>; +} + #[derive(Debug)] pub struct WasiSignalInterval { /// Signal that will be raised diff --git a/lib/wasi/src/os/task/task_join_handle.rs b/lib/wasi/src/os/task/task_join_handle.rs index 83168237952..be56cadb9e8 100644 --- a/lib/wasi/src/os/task/task_join_handle.rs +++ b/lib/wasi/src/os/task/task_join_handle.rs @@ -1,58 +1,174 @@ -use std::sync::Mutex; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; -use wasmer_wasi_types::wasi::ExitCode; +use wasmer_wasi_types::wasi::{Errno, ExitCode}; + +use crate::WasiRuntimeError; + +#[derive(Clone, Debug)] +pub enum TaskStatus { + Pending, + Running, + Finished(Result>), +} + +impl TaskStatus { + /// Returns `true` if the task status is [`Pending`]. + /// + /// [`Pending`]: TaskStatus::Pending + #[must_use] + pub fn is_pending(&self) -> bool { + matches!(self, Self::Pending) + } + + /// Returns `true` if the task status is [`Running`]. + /// + /// [`Running`]: TaskStatus::Running + #[must_use] + pub fn is_running(&self) -> bool { + matches!(self, Self::Running) + } + + pub fn into_finished(self) -> Option>> { + match self { + Self::Finished(res) => Some(res), + _ => None, + } + } +} + +#[derive(thiserror::Error, Debug)] +#[error("Task already terminated")] +pub struct TaskTerminatedError; + +pub trait VirtualTaskHandle: std::fmt::Debug + Send + Sync + 'static { + fn status(&self) -> TaskStatus; + + /// Polls to check if the process is ready yet to receive commands + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; + + fn poll_finished( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>>; +} /// A handle that allows awaiting the termination of a task, and retrieving its exit code. #[derive(Debug)] -pub struct TaskJoinHandle { - exit_code: Mutex>, - sender: tokio::sync::broadcast::Sender<()>, +pub struct OwnedTaskStatus { + watch: tokio::sync::watch::Sender, } -impl TaskJoinHandle { - pub fn new() -> Self { - let (sender, _) = tokio::sync::broadcast::channel(1); +impl OwnedTaskStatus { + pub fn new(status: TaskStatus) -> Self { Self { - exit_code: Mutex::new(None), - sender, + watch: tokio::sync::watch::channel(status).0, } } + pub fn new_finished_with_code(code: ExitCode) -> Self { + Self::new(TaskStatus::Finished(Ok(code))) + } + /// Marks the task as finished. - pub(super) fn terminate(&self, exit_code: u32) { - let mut lock = self.exit_code.lock().unwrap(); - if lock.is_none() { - *lock = Some(exit_code); - std::mem::drop(lock); - self.sender.send(()).ok(); - } + pub fn set_running(&self) { + self.watch.send_modify(|value| { + // Only set to running if task was pending, otherwise the transition would be invalid. + if value.is_pending() { + *value = TaskStatus::Running; + } + }) + } + + /// Marks the task as finished. + pub(super) fn set_finished(&self, res: Result>) { + let inner = match res { + Ok(code) => Ok(code), + Err(err) => { + if let Some(code) = err.as_exit_code() { + Ok(code) + } else { + Err(err) + } + } + }; + + self.watch.send(TaskStatus::Finished(inner)).ok(); } - pub async fn await_termination(&self) -> Option { - // FIXME: why is this a loop? should not be necessary, - // Should be redundant since the subscriber is created while holding the lock. + pub fn status(&self) -> TaskStatus { + self.watch.borrow().clone() + } + + pub async fn await_termination(&self) -> Result> { + let mut receiver = self.watch.subscribe(); + match &*receiver.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } loop { - let mut rx = { - let code_opt = self.exit_code.lock().unwrap(); - if code_opt.is_some() { - return *code_opt; + // NOTE: unwrap() is fine, because &self always holds on to the sender. + receiver.changed().await.unwrap(); + match &*receiver.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); } - self.sender.subscribe() - }; - if rx.recv().await.is_err() { - return None; } } } - /// Returns the exit code if the task has finished, and None otherwise. - pub fn get_exit_code(&self) -> Option { - *self.exit_code.lock().unwrap() + pub fn handle(&self) -> TaskJoinHandle { + TaskJoinHandle { + watch: self.watch.subscribe(), + } } } -impl Default for TaskJoinHandle { +impl Default for OwnedTaskStatus { fn default() -> Self { - Self::new() + Self::new(TaskStatus::Pending) + } +} + +/// A handle that allows awaiting the termination of a task, and retrieving its exit code. +#[derive(Clone, Debug)] +pub struct TaskJoinHandle { + watch: tokio::sync::watch::Receiver, +} + +impl TaskJoinHandle { + /// Retrieve the current status. + pub fn status(&self) -> TaskStatus { + self.watch.borrow().clone() + } + + /// Wait until the task finishes. + pub async fn wait_finished(&mut self) -> Result> { + match &*self.watch.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } + loop { + if self.watch.changed().await.is_err() { + return Ok(Errno::Noent as u32); + } + match &*self.watch.borrow_and_update() { + TaskStatus::Pending | TaskStatus::Running => {} + TaskStatus::Finished(res) => { + return res.clone(); + } + } + } } } diff --git a/lib/wasi/src/os/task/thread.rs b/lib/wasi/src/os/task/thread.rs index d43cd3b771b..a57e2af6867 100644 --- a/lib/wasi/src/os/task/thread.rs +++ b/lib/wasi/src/os/task/thread.rs @@ -11,9 +11,15 @@ use wasmer_wasi_types::{ wasi::{Errno, ExitCode}, }; -use crate::os::task::process::{WasiProcessId, WasiProcessInner}; +use crate::{ + os::task::process::{WasiProcessId, WasiProcessInner}, + WasiRuntimeError, +}; -use super::{control_plane::TaskCountGuard, task_join_handle::TaskJoinHandle}; +use super::{ + control_plane::TaskCountGuard, + task_join_handle::{OwnedTaskStatus, TaskJoinHandle}, +}; /// Represents the ID of a WASI thread #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -84,6 +90,31 @@ pub struct WasiThread { state: Arc, } +/// A guard that ensures a thread is marked as terminated when dropped. +/// +/// Normally the thread result should be manually registered with +/// [`Thread::set_status_running`] or [`Thread::set_status_finished`], but +/// this guard can ensure that the thread is marked as terminated even if this +/// is forgotten or a panic occurs. +pub struct WasiThreadGuard { + pub thread: WasiThread, +} + +impl WasiThreadGuard { + pub fn new(thread: WasiThread) -> Self { + Self { thread } + } +} + +impl Drop for WasiThreadGuard { + fn drop(&mut self) { + self.thread + .set_status_finished(Err( + crate::RuntimeError::new("Thread manager disconnected").into() + )); + } +} + #[derive(Debug)] struct WasiThreadState { is_main: bool, @@ -91,7 +122,7 @@ struct WasiThreadState { id: WasiThreadId, signals: Mutex<(Vec, Vec)>, stack: Mutex, - finished: Arc, + status: Arc, // Registers the task termination with the ControlPlane on drop. // Never accessed, since it's a drop guard. @@ -105,7 +136,7 @@ impl WasiThread { pid: WasiProcessId, id: WasiThreadId, is_main: bool, - finished: Arc, + status: Arc, guard: TaskCountGuard, ) -> Self { Self { @@ -113,7 +144,7 @@ impl WasiThread { is_main, pid, id, - finished, + status, signals: Mutex::new((Vec::new(), Vec::new())), stack: Mutex::new(ThreadStack::default()), _task_count_guard: guard, @@ -136,25 +167,34 @@ impl WasiThread { self.state.is_main } + /// Get a join handle to watch the task status. + pub fn join_handle(&self) -> TaskJoinHandle { + self.state.status.handle() + } + // TODO: this should be private, access should go through utility methods. pub fn signals(&self) -> &Mutex<(Vec, Vec)> { &self.state.signals } + pub fn set_status_running(&self) { + self.state.status.set_running(); + } + /// Marks the thread as finished (which will cause anyone that /// joined on it to wake up) - pub fn terminate(&self, exit_code: u32) { - self.state.finished.terminate(exit_code); + pub fn set_status_finished(&self, res: Result) { + self.state.status.set_finished(res.map_err(Arc::new)); } /// Waits until the thread is finished or the timeout is reached - pub async fn join(&self) -> Option { - self.state.finished.await_termination().await + pub async fn join(&self) -> Result> { + self.state.status.await_termination().await } /// Attempts to join on the thread - pub fn try_join(&self) -> Option { - self.state.finished.get_exit_code() + pub fn try_join(&self) -> Option>> { + self.state.status.status().into_finished() } /// Adds a signal for this thread to process @@ -362,7 +402,7 @@ impl Drop for WasiThreadHandle { if let Some(id) = Arc::get_mut(&mut self.id) { let mut inner = self.inner.write().unwrap(); if let Some(ctrl) = inner.threads.remove(id) { - ctrl.terminate(0); + ctrl.set_status_finished(Ok(0)); } inner.thread_count -= 1; } diff --git a/lib/wasi/src/os/tty/mod.rs b/lib/wasi/src/os/tty/mod.rs index c32502886e8..02e29da0edb 100644 --- a/lib/wasi/src/os/tty/mod.rs +++ b/lib/wasi/src/os/tty/mod.rs @@ -3,7 +3,6 @@ use std::{ sync::{Arc, Mutex}, }; -use crate::vbus::SignalHandlerAbi; use derivative::*; use futures::future::BoxFuture; use wasmer_vfs::{AsyncWriteExt, NullFile, VirtualFile}; @@ -11,6 +10,8 @@ use wasmer_wasi_types::wasi::{Signal, Snapshot0Clockid}; use crate::syscalls::platform_clock_time_get; +use super::task::signal::SignalHandlerAbi; + const TTY_MOBILE_PAUSE: u128 = std::time::Duration::from_millis(200).as_nanos(); #[cfg(feature = "host-termios")] @@ -229,7 +230,7 @@ impl Tty { fn on_ctrl_c(mut self, _data: Cow<'static, [u8]>) -> BoxFuture<'static, Self> { Box::pin(async move { if let Some(signaler) = self.signaler.as_ref() { - signaler.signal(Signal::Sigint as u8); + signaler.signal(Signal::Sigint as u8).ok(); let (echo, _line_buffering) = { let options = self.options.inner.lock().unwrap(); diff --git a/lib/wasi/src/state/builder.rs b/lib/wasi/src/state/builder.rs index cf62e6ad99b..4957fb5ea99 100644 --- a/lib/wasi/src/state/builder.rs +++ b/lib/wasi/src/state/builder.rs @@ -752,6 +752,7 @@ impl WasiEnvBuilder { let start = instance.exports.get_function("_start")?; + env.data(store).thread.set_status_running(); let res = crate::run_wasi_func_start(start, store); let exit_code = match &res { diff --git a/lib/wasi/src/state/env.rs b/lib/wasi/src/state/env.rs index 60d50a7a30e..ffb989d0d77 100644 --- a/lib/wasi/src/state/env.rs +++ b/lib/wasi/src/state/env.rs @@ -533,7 +533,7 @@ impl WasiEnv { let signal_cnt = signals.len(); for sig in signals { if sig == Signal::Sigint || sig == Signal::Sigquit || sig == Signal::Sigkill { - env.thread.terminate(Errno::Intr as u32); + env.thread.set_status_finished(Ok(Errno::Intr as u32)); return Err(WasiError::Exit(Errno::Intr as u32)); } else { trace!("wasi[{}]::signal-ignored: {:?}", env.pid(), sig); @@ -562,7 +562,7 @@ impl WasiEnv { .thread .has_signal(&[Signal::Sigint, Signal::Sigquit, Signal::Sigkill]) { - env.thread.terminate(Errno::Intr as u32); + env.thread.set_status_finished(Ok(Errno::Intr as u32)); } return Ok(Ok(false)); } @@ -649,10 +649,10 @@ impl WasiEnv { pub fn should_exit(&self) -> Option { // Check for forced exit if let Some(forced_exit) = self.thread.try_join() { - return Some(forced_exit); + return Some(forced_exit.unwrap_or(Errno::Child as u32)); } if let Some(forced_exit) = self.process.try_join() { - return Some(forced_exit); + return Some(forced_exit.unwrap_or(Errno::Child as u32)); } None } diff --git a/lib/wasi/src/state/mod.rs b/lib/wasi/src/state/mod.rs index fb407ca99af..0fca7b274ea 100644 --- a/lib/wasi/src/state/mod.rs +++ b/lib/wasi/src/state/mod.rs @@ -30,7 +30,6 @@ use std::{ time::Duration, }; -use crate::vbus::VirtualBusInvocation; use derivative::Derivative; #[cfg(feature = "enable-serde")] use serde::{Deserialize, Serialize}; @@ -48,7 +47,6 @@ pub use self::{ pub use crate::fs::{InodeGuard, InodeWeakGuard}; use crate::{ fs::{fs_error_into_wasi_err, WasiFs, WasiFsRoot, WasiInodes, WasiStateFileGuard}, - os::task::process::WasiProcessId, syscalls::types::*, utils::WasiParkingLot, WasiCallingId, @@ -107,12 +105,6 @@ pub struct WasiFutex { pub(crate) wakers: Vec, } -#[derive(Debug)] -pub struct WasiBusCall { - pub bid: WasiProcessId, - pub invocation: Box, -} - /// Structure that holds the state of BUS calls to this process and from /// this process. BUS calls are the equivalent of RPC's with support /// for all the major serializers diff --git a/lib/wasi/src/state/types.rs b/lib/wasi/src/state/types.rs index 381e4654416..7a0fe98f48d 100644 --- a/lib/wasi/src/state/types.rs +++ b/lib/wasi/src/state/types.rs @@ -5,9 +5,10 @@ use cfg_if::cfg_if; #[cfg(feature = "enable-serde")] use serde::{Deserialize, Serialize}; -use crate::vbus::VirtualBusError; use wasmer_wasi_types::wasi::{BusErrno, Rights}; +use crate::VirtualBusError; + cfg_if! { if #[cfg(feature = "host-fs")] { pub use wasmer_vfs::host_fs::{Stderr, Stdin, Stdout}; diff --git a/lib/wasi/src/syscalls/mod.rs b/lib/wasi/src/syscalls/mod.rs index ee2ef1165cb..1aa18162409 100644 --- a/lib/wasi/src/syscalls/mod.rs +++ b/lib/wasi/src/syscalls/mod.rs @@ -62,10 +62,6 @@ pub use unix::*; #[cfg(any(target_family = "wasm"))] pub use wasm::*; -pub(crate) use crate::vbus::{ - BusInvocationEvent, BusSpawnedProcess, SignalHandlerAbi, StdioMode, VirtualBusError, - VirtualBusInvokedWait, -}; pub(crate) use wasmer::{ AsStoreMut, AsStoreRef, Extern, Function, FunctionEnv, FunctionEnvMut, Global, Instance, Memory, Memory32, Memory64, MemoryAccessError, MemoryError, MemorySize, MemoryView, Module, @@ -81,10 +77,10 @@ pub use windows::*; pub(crate) use self::types::{ wasi::{ - Addressfamily, Advice, Bid, BusDataFormat, BusErrno, BusHandles, Cid, Clockid, Dircookie, - Dirent, Errno, Event, EventFdReadwrite, Eventrwflags, Eventtype, ExitCode, Fd as WasiFd, - Fdflags, Fdstat, Filesize, Filestat, Filetype, Fstflags, Linkcount, Longsize, OptionFd, - Pid, Prestat, Rights, Snapshot0Clockid, Sockoption, Sockstatus, Socktype, StackSnapshot, + Addressfamily, Advice, Bid, BusErrno, BusHandles, Cid, Clockid, Dircookie, Dirent, Errno, + Event, EventFdReadwrite, Eventrwflags, Eventtype, ExitCode, Fd as WasiFd, Fdflags, Fdstat, + Filesize, Filestat, Filetype, Fstflags, Linkcount, Longsize, OptionFd, Pid, Prestat, + Rights, Snapshot0Clockid, Sockoption, Sockstatus, Socktype, StackSnapshot, StdioMode as WasiStdioMode, Streamsecurity, Subscription, SubscriptionFsReadwrite, Tid, Timestamp, TlKey, TlUser, TlVal, Tty, Whence, }, @@ -106,7 +102,7 @@ pub(crate) use crate::{ runtime::{task_manager::VirtualTaskManagerExt, SpawnType}, state::{ self, bus_errno_into_vbus_error, iterate_poll_events, vbus_error_into_bus_errno, - InodeGuard, InodeWeakGuard, PollEvent, PollEventBuilder, WasiBusCall, WasiFutex, WasiState, + InodeGuard, InodeWeakGuard, PollEvent, PollEventBuilder, WasiFutex, WasiState, WasiThreadContext, }, utils::{self, map_io_err}, @@ -119,7 +115,7 @@ use crate::{ MAX_SYMLINKS, }, utils::store::InstanceSnapshot, - WasiInodes, + VirtualBusError, WasiInodes, }; pub(crate) use crate::{net::net_error_into_wasi_err, utils::WasiParkingLot}; @@ -1101,12 +1097,3 @@ pub(crate) fn conv_bus_err_to_exit_code(err: VirtualBusError) -> ExitCode { _ => Errno::Inval as ExitCode, } } - -// Function for converting the format -pub(crate) fn conv_bus_format(format: BusDataFormat) -> BusDataFormat { - format -} - -pub(crate) fn conv_bus_format_from(format: BusDataFormat) -> BusDataFormat { - format -} diff --git a/lib/wasi/src/syscalls/wasi/proc_exit.rs b/lib/wasi/src/syscalls/wasi/proc_exit.rs index ce5cbe89c37..83869eacb76 100644 --- a/lib/wasi/src/syscalls/wasi/proc_exit.rs +++ b/lib/wasi/src/syscalls/wasi/proc_exit.rs @@ -20,7 +20,7 @@ pub fn proc_exit( ); // Set the exit code for this process - ctx.data().thread.terminate(code as u32); + ctx.data().thread.set_status_finished(Ok(code as u32)); // If we are in a vfork we need to return to the point we left off if let Some(mut vfork) = ctx.data_mut().vfork.take() { diff --git a/lib/wasi/src/syscalls/wasix/proc_exec.rs b/lib/wasi/src/syscalls/wasix/proc_exec.rs index 5eef36e748d..704bb8e1e18 100644 --- a/lib/wasi/src/syscalls/wasix/proc_exec.rs +++ b/lib/wasi/src/syscalls/wasix/proc_exec.rs @@ -1,5 +1,8 @@ use super::*; -use crate::syscalls::*; +use crate::{ + os::task::{OwnedTaskStatus, TaskStatus}, + syscalls::*, +}; /// Replaces the current process with a new process /// @@ -161,14 +164,14 @@ pub fn proc_exec( ctx.data().tid(), err_exit_code ); - BusSpawnedProcess::exited_process(err_exit_code) + OwnedTaskStatus::new(TaskStatus::Finished(Ok(err_exit_code))).handle() } }; // Add the process to the environment state { let mut inner = ctx.data().process.write(); - inner.bus_processes.insert(child_pid, Box::new(process)); + inner.bus_processes.insert(child_pid, process); } let mut memory_stack = vfork.memory_stack; @@ -270,13 +273,8 @@ pub fn proc_exec( let (tx, rx) = std::sync::mpsc::channel(); let tasks_inner = tasks.clone(); tasks.block_on(Box::pin(async move { - loop { - tasks_inner.sleep_now(Duration::from_millis(5)).await; - if let Some(exit_code) = process.inst.exit_code() { - tx.send(exit_code).unwrap(); - break; - } - } + let code = process.wait_finished().await.unwrap_or(Errno::Child as u32); + tx.send(code); })); let exit_code = rx.recv().unwrap(); OnCalledAction::Trap(Box::new(WasiError::Exit(exit_code as ExitCode))) diff --git a/lib/wasi/src/syscalls/wasix/proc_fork.rs b/lib/wasi/src/syscalls/wasix/proc_fork.rs index ac4bdf3dfd5..22ba49086df 100644 --- a/lib/wasi/src/syscalls/wasix/proc_fork.rs +++ b/lib/wasi/src/syscalls/wasix/proc_fork.rs @@ -1,5 +1,5 @@ use super::*; -use crate::syscalls::*; +use crate::{os::task::OwnedTaskStatus, syscalls::*}; #[cfg(feature = "sys")] use wasmer::vm::VMMemory; @@ -331,18 +331,9 @@ pub fn proc_fork( }; // Add the process to the environment state - let process = BusSpawnedProcess { - inst: Box::new(crate::bin_factory::SpawnedProcess { - exit_code: Mutex::new(None), - exit_code_rx: Mutex::new(exit_code_rx), - }), - stdin: None, - stdout: None, - stderr: None, - signaler: Some(signaler), - module_memory_footprint: 0, - file_system_memory_footprint: 0, - }; + + let process = OwnedTaskStatus::default(); + { trace!( "wasi[{}:{}]::spawned sub-process (pid={})", @@ -351,7 +342,7 @@ pub fn proc_fork( child_pid.raw() ); let mut inner = ctx.data().process.write(); - inner.bus_processes.insert(child_pid, Box::new(process)); + inner.bus_processes.insert(child_pid, process.handle()); } // If the return value offset is within the memory stack then we need diff --git a/lib/wasi/src/syscalls/wasix/proc_join.rs b/lib/wasi/src/syscalls/wasix/proc_join.rs index e886ba2b3a2..f55839be17d 100644 --- a/lib/wasi/src/syscalls/wasix/proc_join.rs +++ b/lib/wasi/src/syscalls/wasix/proc_join.rs @@ -66,7 +66,8 @@ pub fn proc_join( let process = env.process.control_plane().get_process(pid); if let Some(process) = process { let exit_code = wasi_try_ok!(__asyncify(&mut ctx, None, async move { - process.join().await.ok_or(Errno::Child) + let code = process.join().await.unwrap_or(Errno::Child as u32); + Ok(code) })?); trace!("child ({}) exited with {}", pid.raw(), exit_code); diff --git a/lib/wasi/src/syscalls/wasix/proc_spawn.rs b/lib/wasi/src/syscalls/wasix/proc_spawn.rs index c679c179328..84bfa4d0b0a 100644 --- a/lib/wasi/src/syscalls/wasix/proc_spawn.rs +++ b/lib/wasi/src/syscalls/wasix/proc_spawn.rs @@ -267,7 +267,7 @@ pub fn proc_spawn_internal( { let mut guard = env.process.write(); - guard.bus_processes.insert(child_pid, Box::new(process)); + guard.bus_processes.insert(child_pid, process); }; let handles = BusHandles { diff --git a/lib/wasi/src/vbus.rs b/lib/wasi/src/vbus.rs deleted file mode 100644 index 932c20ccca3..00000000000 --- a/lib/wasi/src/vbus.rs +++ /dev/null @@ -1,400 +0,0 @@ -use std::fmt; -use std::future::Future; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; -use thiserror::Error; - -pub use wasmer_vfs::StdioMode; -use wasmer_vfs::VirtualFile; -use wasmer_wasi_types::wasi::{BusDataFormat, ExitCode}; - -enum BusSpawnedProcessJoinResult { - Active(Box), - Finished(Option), -} - -#[derive(Clone)] -pub struct BusSpawnedProcessJoin { - inst: Arc>, -} - -impl BusSpawnedProcessJoin { - pub fn new(process: BusSpawnedProcess) -> Self { - Self { - inst: Arc::new(Mutex::new(BusSpawnedProcessJoinResult::Active( - process.inst, - ))), - } - } - - pub fn poll_finished(&self, cx: &mut Context<'_>) -> Poll> { - let mut guard = self.inst.lock().unwrap(); - match guard.deref_mut() { - BusSpawnedProcessJoinResult::Active(inst) => { - let pinned_inst = Pin::new(inst.as_mut()); - match pinned_inst.poll_ready(cx) { - Poll::Ready(_) => { - let exit_code = inst.exit_code(); - let mut swap = BusSpawnedProcessJoinResult::Finished(exit_code); - std::mem::swap(guard.deref_mut(), &mut swap); - Poll::Ready(exit_code) - } - Poll::Pending => Poll::Pending, - } - } - BusSpawnedProcessJoinResult::Finished(exit_code) => Poll::Ready(*exit_code), - } - } -} - -impl Future for BusSpawnedProcessJoin { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.poll_finished(cx) - } -} - -impl std::fmt::Debug for BusSpawnedProcessJoin { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BusSpawnedProcessJoin").finish() - } -} - -/// Signal handles...well...they process signals -pub trait SignalHandlerAbi -where - Self: std::fmt::Debug, -{ - /// Processes a signal - fn signal(&self, sig: u8); -} - -#[derive(Debug)] -pub struct BusSpawnedProcess { - /// Reference to the spawned instance - pub inst: Box, - /// Virtual file used for stdin - pub stdin: Option>, - /// Virtual file used for stdout - pub stdout: Option>, - /// Virtual file used for stderr - pub stderr: Option>, - /// The signal handler for this process (if any) - pub signaler: Option>, - /// Amount of memory that the module uses - pub module_memory_footprint: u64, - /// Combined memory uses by the module and the file system - pub file_system_memory_footprint: u64, -} - -impl BusSpawnedProcess { - pub fn exited_process(exit_code: ExitCode) -> Self { - Self { - inst: Box::new(ExitedProcess { exit_code }), - stdin: None, - stdout: None, - stderr: None, - signaler: None, - module_memory_footprint: 0, - file_system_memory_footprint: 0, - } - } -} - -pub trait VirtualBusScope: fmt::Debug + Send + Sync + 'static { - //// Returns true if the invokable target has finished - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>; -} - -pub trait VirtualBusInvokable: fmt::Debug + Send + Sync + 'static { - /// Invokes a service within this instance - #[allow(unused_variables)] - fn invoke( - &self, - topic_hash: u128, - format: BusDataFormat, - buf: Vec, - ) -> Box { - Box::new(UnsupportedBusInvoker::default()) - } -} - -#[derive(Debug, Default)] -struct UnsupportedBusInvoker {} - -impl VirtualBusInvoked for UnsupportedBusInvoker { - #[allow(unused_variables)] - fn poll_invoked( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>> { - Poll::Ready(Err(VirtualBusError::Unsupported)) - } -} - -pub trait VirtualBusInvoked: fmt::Debug + Unpin + 'static { - //// Returns once the bus has been invoked (or failed) - fn poll_invoked( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>>; -} -pub struct VirtualBusInvokedWait { - invoked: Box, -} -impl VirtualBusInvokedWait { - pub fn new(invoked: Box) -> Self { - Self { invoked } - } -} -impl Future for VirtualBusInvokedWait { - type Output = Result, VirtualBusError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let invoked = Pin::new(self.invoked.deref_mut()); - invoked.poll_invoked(cx) - } -} - -pub trait VirtualBusProcess: - VirtualBusScope + VirtualBusInvokable + fmt::Debug + Send + Sync + 'static -{ - /// Returns the exit code if the instance has finished - fn exit_code(&self) -> Option; - - /// Polls to check if the process is ready yet to receive commands - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()>; -} - -pub trait VirtualBusInvocation: - VirtualBusInvokable + fmt::Debug + Send + Sync + Unpin + 'static -{ - /// Polls for new listen events related to this context - fn poll_event(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; -} - -#[derive(Debug)] -pub struct InstantInvocation { - val: Option, - err: Option, - call: Option>, -} - -impl InstantInvocation { - pub fn response(format: BusDataFormat, data: Vec) -> Self { - Self { - val: Some(BusInvocationEvent::Response { format, data }), - err: None, - call: None, - } - } - - pub fn fault(err: VirtualBusError) -> Self { - Self { - val: None, - err: Some(err), - call: None, - } - } - - pub fn call(val: Box) -> Self { - Self { - val: None, - err: None, - call: Some(val), - } - } -} - -impl VirtualBusInvoked for InstantInvocation { - fn poll_invoked( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, VirtualBusError>> { - if let Some(err) = self.err.take() { - return Poll::Ready(Err(err)); - } - if let Some(val) = self.val.take() { - return Poll::Ready(Ok(Box::new(InstantInvocation { - val: Some(val), - err: None, - call: None, - }))); - } - match self.call.take() { - Some(val) => Poll::Ready(Ok(val)), - None => Poll::Ready(Err(VirtualBusError::AlreadyConsumed)), - } - } -} - -impl VirtualBusInvocation for InstantInvocation { - fn poll_event(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - match self.val.take() { - Some(val) => Poll::Ready(val), - None => Poll::Ready(BusInvocationEvent::Fault { - fault: VirtualBusError::AlreadyConsumed, - }), - } - } -} - -impl VirtualBusInvokable for InstantInvocation { - fn invoke( - &self, - _topic_hash: u128, - _format: BusDataFormat, - _buf: Vec, - ) -> Box { - Box::new(InstantInvocation { - val: None, - err: Some(VirtualBusError::InvalidTopic), - call: None, - }) - } -} - -#[derive(Debug)] -pub enum BusInvocationEvent { - /// The server has sent some out-of-band data to you - Callback { - /// Topic that this call relates to - topic_hash: u128, - /// Format of the data we received - format: BusDataFormat, - /// Data passed in the call - data: Vec, - }, - /// The service has a responded to your call - Response { - /// Format of the data we received - format: BusDataFormat, - /// Data returned by the call - data: Vec, - }, - /// The service has responded with a fault - Fault { - /// Fault code that was raised - fault: VirtualBusError, - }, -} - -pub trait VirtualBusListener: fmt::Debug + Send + Sync + Unpin + 'static { - /// Polls for new calls to this service - fn poll(self: Pin<&Self>, cx: &mut Context<'_>) -> Poll; -} - -#[derive(Debug)] -pub struct BusCallEvent { - /// Topic hash that this call relates to - pub topic_hash: u128, - /// Reference to the call itself - pub called: Box, - /// Format of the data we received - pub format: BusDataFormat, - /// Data passed in the call - pub data: Vec, -} - -pub trait VirtualBusCalled: fmt::Debug + Send + Sync + 'static { - /// Polls for new calls to this service - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; - - /// Sends an out-of-band message back to the caller - fn callback(&self, topic_hash: u128, format: BusDataFormat, buf: Vec); - - /// Informs the caller that their call has failed - fn fault(self: Box, fault: VirtualBusError); - - /// Finishes the call and returns a particular response - fn reply(&self, format: BusDataFormat, buf: Vec); -} - -#[derive(Error, Copy, Clone, Debug, PartialEq, Eq)] -pub enum VirtualBusError { - /// Failed during serialization - #[error("serialization failed")] - Serialization, - /// Failed during deserialization - #[error("deserialization failed")] - Deserialization, - /// Invalid WAPM process - #[error("invalid wapm")] - InvalidWapm, - /// Failed to fetch the WAPM process - #[error("fetch failed")] - FetchFailed, - /// Failed to compile the WAPM process - #[error("compile error")] - CompileError, - /// Invalid ABI - #[error("WAPM process has an invalid ABI")] - InvalidABI, - /// Call was aborted - #[error("call aborted")] - Aborted, - /// Bad handle - #[error("bad handle")] - BadHandle, - /// Invalid topic - #[error("invalid topic")] - InvalidTopic, - /// Invalid callback - #[error("invalid callback")] - BadCallback, - /// Call is unsupported - #[error("unsupported")] - Unsupported, - /// Not found - #[error("not found")] - NotFound, - /// Bad request - #[error("bad request")] - BadRequest, - /// Access denied - #[error("access denied")] - AccessDenied, - /// Internal error has occured - #[error("internal error")] - InternalError, - /// Memory allocation failed - #[error("memory allocation failed")] - MemoryAllocationFailed, - /// Invocation has failed - #[error("invocation has failed")] - InvokeFailed, - /// Already consumed - #[error("already consumed")] - AlreadyConsumed, - /// Memory access violation - #[error("memory access violation")] - MemoryAccessViolation, - /// Some other unhandled error. If you see this, it's probably a bug. - #[error("unknown error found")] - UnknownError, -} - -#[derive(Debug)] -pub struct ExitedProcess { - pub exit_code: ExitCode, -} - -impl VirtualBusProcess for ExitedProcess { - fn exit_code(&self) -> Option { - Some(self.exit_code) - } - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { - Poll::Ready(()) - } -} - -impl VirtualBusScope for ExitedProcess { - fn poll_finished(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - VirtualBusProcess::poll_ready(self, cx) - } -} - -impl VirtualBusInvokable for ExitedProcess {} diff --git a/lib/wasi/tests/stdio.rs b/lib/wasi/tests/stdio.rs index d7899f68e72..e1480720330 100644 --- a/lib/wasi/tests/stdio.rs +++ b/lib/wasi/tests/stdio.rs @@ -43,7 +43,7 @@ mod sys { async fn test_stdout() { let mut store = Store::default(); - let module = Module::new(&mut store, br#" + let module = Module::new(&store, br#" (module ;; Import the required fd_write WASI function which will write the given io vectors to stdout ;; The function signature for fd_write is: