Skip to content

Commit

Permalink
Added a WasiEnvBuilder::run_with_store_async() method
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-F-Bryan committed Jun 1, 2023
1 parent db7b2e4 commit d38dc60
Showing 1 changed file with 164 additions and 24 deletions.
188 changes: 164 additions & 24 deletions lib/wasi/src/state/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use std::{
sync::Arc,
};

use bytes::Bytes;
use rand::Rng;
use thiserror::Error;
use virtual_fs::{ArcFile, FsError, TmpFileSystem, VirtualFile};
use wasmer::{AsStoreMut, Instance, Module, Store};
use wasmer_wasix_types::wasi::Errno;
use wasmer::{AsStoreMut, Instance, Module, RuntimeError, Store};
use wasmer_wasix_types::wasi::{Errno, ExitCode};

#[cfg(feature = "sys")]
use crate::PluggableRuntime;
Expand All @@ -21,7 +22,7 @@ use crate::{
os::task::control_plane::{ControlPlaneConfig, ControlPlaneError, WasiControlPlane},
state::WasiState,
syscalls::types::{__WASI_STDERR_FILENO, __WASI_STDIN_FILENO, __WASI_STDOUT_FILENO},
Runtime, WasiEnv, WasiFunctionEnv, WasiRuntimeError,
RewindState, Runtime, WasiEnv, WasiError, WasiFunctionEnv, WasiRuntimeError,
};

use super::env::WasiEnvInit;
Expand Down Expand Up @@ -792,40 +793,179 @@ impl WasiEnvBuilder {

#[allow(clippy::result_large_err)]
pub fn run_with_store(self, module: Module, store: &mut Store) -> Result<(), WasiRuntimeError> {
if self.capabilites.threading.enable_asynchronous_threading {
tracing::warn!(
"The enable_asynchronous_threading capability is enabled. Use WasiEnvBuilder::run_with_store_async() to avoid spurious errors.",
);
}

let (instance, env) = self.instantiate(module, store)?;

let start = instance.exports.get_function("_start")?;
env.data(&store).thread.set_status_running();

env.data(store).thread.set_status_running();
let mut res = crate::run_wasi_func_start(start, store);
let result = crate::run_wasi_func_start(start, store);
let (result, exit_code) = wasi_exit_code(result);

let pid = env.data(&store).pid();
let tid = env.data(&store).tid();
tracing::trace!(
"wasi[{}:{}]::main exit (code = {:?})",
env.data(store).pid(),
env.data(store).tid(),
res
%pid,
%tid,
%exit_code,
error=result.as_ref().err().map(|e| &*e as &dyn std::error::Error),
"main exit",
);

let exit_code = match &res {
Ok(_) => Errno::Success.into(),
Err(err) => match err.as_exit_code() {
Some(code) if code.is_success() => {
// This is actually not an error, so we need to fix up the
// result
res = Ok(());
Errno::Success.into()
}
Some(other) => other,
None => Errno::Noexec.into(),
},
};

env.cleanup(store, Some(exit_code));

res
result
}

/// Start the WASI executable with async threads enabled.
#[allow(clippy::result_large_err)]
pub fn run_with_store_async(
self,
module: Module,
mut store: Store,
) -> Result<(), WasiRuntimeError> {
let (instance, env) = self.instantiate(module, &mut store)?;

let start = instance.exports.get_function("_start")?.clone();
env.data(&store).thread.set_status_running();

let tasks = env.data(&store).tasks().clone();
let pid = env.data(&store).pid();
let tid = env.data(&store).tid();

// The return value is passed synchronously and will block until the result
// is returned this is because the main thread can go into a deep sleep and
// exit the dedicated thread
let (tx, rx) = std::sync::mpsc::channel();

tasks.task_dedicated(Box::new(move || {
run_with_deep_sleep(store, start, None, env, tx);
}));

let result = rx.recv()
.expect("main thread terminated without a result, this normally means a panic occurred within the main thread");
let (result, exit_code) = wasi_exit_code(result);

tracing::trace!(
%pid,
%tid,
%exit_code,
error=result.as_ref().err().map(|e| &*e as &dyn std::error::Error),
"main exit",
);

result
}
}

/// Extract the exit code from a `Result<(), WasiRuntimeError>`.
///
/// We need this because calling `exit(0)` inside a WASI program technically
/// triggers [`WasiError`] with an exit code of `0`, but the end user won't want
/// that treated as an error.
fn wasi_exit_code(
mut result: Result<(), WasiRuntimeError>,
) -> (Result<(), WasiRuntimeError>, ExitCode) {
let exit_code = match &result {
Ok(_) => Errno::Success.into(),
Err(err) => match err.as_exit_code() {
Some(code) if code.is_success() => {
// This is actually not an error, so we need to fix up the
// result
result = Ok(());
Errno::Success.into()
}
Some(other) => other,
None => Errno::Noexec.into(),
},
};

(result, exit_code)
}

fn run_with_deep_sleep(
mut store: Store,
start: wasmer::Function,
rewind_state: Option<(RewindState, Bytes)>,
env: WasiFunctionEnv,
sender: std::sync::mpsc::Sender<Result<(), WasiRuntimeError>>,
) {
if let Some((rewind_state, rewind_result)) = rewind_state {
tracing::trace!("Rewinding");
let errno = if rewind_state.is_64bit {
crate::rewind_ext::<wasmer_types::Memory64>(
env.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
)
} else {
crate::rewind_ext::<wasmer_types::Memory64>(
env.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
)
};

if errno != Errno::Success {
let exit_code = ExitCode::from(errno);
env.cleanup(&mut store, Some(exit_code));
sender.send(Err(WasiRuntimeError::Wasi(WasiError::Exit(exit_code))));
return;
}
}

let result = start.call(&mut store, &[]);
handle_result(store, env, result, start, sender);
}

fn handle_result(
mut store: Store,
env: WasiFunctionEnv,
result: Result<Box<[wasmer::Value]>, RuntimeError>,
start: wasmer::Function,
sender: std::sync::mpsc::Sender<Result<(), WasiRuntimeError>>,
) {
let result = match result.map_err(|e| e.downcast::<WasiError>()) {
Err(Ok(WasiError::DeepSleep(work))) => {
let pid = env.data(&store).pid();
let tid = env.data(&store).tid();
tracing::trace!(%pid, %tid, "entered a deep sleep");

let tasks = env.data(&store).tasks().clone();
let rewind = work.rewind;
let respawn = {
let env = env.clone();
move |ctx, store, res| {
run_with_deep_sleep(store, start, Some((rewind, res)), env, sender)
}
};

// Spawns the WASM process after a trigger
unsafe {
tasks.resume_wasm_after_poller(Box::new(respawn), env, store, work.trigger);
}

return;
}
Ok(_) => Ok(()),
Err(Ok(other)) => Err(other.into()),
Err(Err(e)) => Err(e.into()),
};

let (result, exit_code) = wasi_exit_code(result);
env.cleanup(&mut store, Some(exit_code));
sender.send(result);
}

/// Builder for preopened directories.
#[derive(Debug, Default)]
pub struct PreopenDirBuilder {
Expand Down

0 comments on commit d38dc60

Please sign in to comment.