Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous threading phase2 #3771

Merged
merged 34 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
15dd412
Use of the inner() method of WasiEnv is now marked as unsafe, also re…
john-sharratt Apr 15, 2023
e2fd1da
Removed the default stack size as its unreliable
john-sharratt Apr 15, 2023
6d2d2da
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt Apr 15, 2023
bd3f8bb
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt Apr 15, 2023
c069cbe
Fixed the last merge
john-sharratt Apr 15, 2023
5390537
Fixed some more unsafe references
john-sharratt Apr 15, 2023
7c84b56
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt Apr 15, 2023
459ce96
Fixed some linting issues
john-sharratt Apr 15, 2023
1b9847a
Fixed a compile issue
john-sharratt Apr 15, 2023
f712ab3
Added back in a default stack size for things that are compiled that …
john-sharratt Apr 15, 2023
7a95f46
Refactored the unsafe cleanup so that its safe
john-sharratt Apr 15, 2023
d6ddf20
Fixed a linting error
john-sharratt Apr 15, 2023
be9ea1d
More fixes for the unsafe conversion
john-sharratt Apr 15, 2023
25e80d2
There is no need to change this line
john-sharratt Apr 15, 2023
99f4048
Missed a space
john-sharratt Apr 15, 2023
cd48bac
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt Apr 19, 2023
ff7d185
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt Apr 19, 2023
6fd6abc
Resolved merge conflicts
john-sharratt Apr 27, 2023
4970316
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt May 13, 2023
e4c187d
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt May 13, 2023
e9ef2e0
Merge branch 'asynchronous-threading' into asynchronous-threading-phase2
john-sharratt May 13, 2023
5611336
Removed the thread locals that are no longer needed
john-sharratt May 13, 2023
b1f40f0
Merge branch 'master' into asynchronous-threading-phase2
john-sharratt May 13, 2023
7c45ce2
Fixed some linting issues
john-sharratt May 13, 2023
3982cb5
Fixed all the review comments
john-sharratt May 13, 2023
f2190b4
Update lib/wasi/src/runtime/task_manager/mod.rs
john-sharratt May 15, 2023
f5f86f4
Merge branch 'asynchronous-threading-phase2' of github.com:wasmerio/w…
john-sharratt May 13, 2023
1699719
Merge branch 'master' into asynchronous-threading-phase2
theduke May 15, 2023
47c3a44
Fixed the compile errors for JSC
john-sharratt May 16, 2023
8e68249
Merge branch 'asynchronous-threading-phase2' of github.com:wasmerio/w…
john-sharratt May 16, 2023
9c0b538
Fixed a compile issue and added a CI/CD test for JSC
john-sharratt May 16, 2023
d5d0b51
Iterating the JSC CI test
john-sharratt May 16, 2023
949bda9
Added a missing dependency on the JSC CI
john-sharratt May 16, 2023
17d80de
Iterating the JSC CI test
john-sharratt May 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ hex = "0.4.3"
flate2 = "1.0.25"
cargo_metadata = "0.15.2"
tar = "0.4.38"
bytes = "1"
thiserror = "1.0.37"
log = "0.4.17"
semver = "1.0.14"
Expand Down
2 changes: 1 addition & 1 deletion lib/cli/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl RunWithPathBuf {
.instantiate(&mut store, &module, program_name, self.args.clone())
.with_context(|| "failed to instantiate WASI module")?;

let capable_of_deep_sleep = ctx.data(&store).capable_of_deep_sleep();
let capable_of_deep_sleep = unsafe { ctx.data(&store).capable_of_deep_sleep() };
ctx.data_mut(&mut store)
.enable_deep_sleep = capable_of_deep_sleep;

Expand Down
37 changes: 17 additions & 20 deletions lib/cli/src/commands/run/wasi.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::utils::{parse_envvar, parse_mapdir};
use anyhow::Result;
use bytes::Bytes;
use std::{
collections::{BTreeSet, HashMap},
path::{Path, PathBuf},
Expand All @@ -12,7 +13,7 @@ use wasmer::{
use wasmer_wasix::{
default_fs_backing, get_wasi_versions,
os::{tty_sys::SysTty, TtyBridge},
rewind,
rewind_ext,
runners::MappedDirectory,
runtime::task_manager::tokio::TokioTaskManager,
types::__WASI_STDIN_FILENO,
Expand Down Expand Up @@ -270,40 +271,30 @@ impl Wasi {
run: RunProperties,
mut store: Store,
tx: Sender<Result<i32>>,
rewind_state: Option<(RewindState, Result<(), Errno>)>,
rewind_state: Option<(RewindState, Bytes)>,
) {
// If we need to rewind then do so
let ctx = run.ctx;
if let Some((mut rewind_state, trigger_res)) = rewind_state {
if let Some((rewind_state, rewind_result)) = rewind_state {
if rewind_state.is_64bit {
if let Err(exit_code) =
rewind_state.rewinding_finish::<Memory64>(&ctx, &mut store, trigger_res)
{
tx.send(Ok(exit_code.raw())).ok();
return;
}
let res = rewind::<Memory64>(
let res = rewind_ext::<Memory64>(
ctx.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
tx.send(Ok(res as i32)).ok();
return;
}
} else {
if let Err(exit_code) =
rewind_state.rewinding_finish::<Memory32>(&ctx, &mut store, trigger_res)
{
tx.send(Ok(exit_code.raw())).ok();
return;
}
let res = rewind::<Memory32>(
let res = rewind_ext::<Memory32>(
ctx.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
tx.send(Ok(res as i32)).ok();
Expand Down Expand Up @@ -392,9 +383,15 @@ impl Wasi {
};

// Spawns the WASM process after a trigger
tasks
.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
.unwrap();
unsafe {
tasks.resume_wasm_after_poller(
Box::new(respawn),
ctx,
store,
deep.trigger,
)
}
.unwrap();
return;
}
Ok(err) => Err(err.into()),
Expand Down
6 changes: 6 additions & 0 deletions lib/wasi-types/src/wasi/bindings.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "enable-serde")]
use serde::{Deserialize, Serialize};
use std::mem::MaybeUninit;
use wasmer::{MemorySize, ValueType};
// TODO: Remove once bindings generate wai_bindgen_rust::bitflags::bitflags! (temp hack)
Expand Down Expand Up @@ -93,6 +95,7 @@ impl core::fmt::Debug for Clockid {
#[doc = " merely for alignment with POSIX."]
#[repr(u16)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub enum Errno {
#[doc = " No error occurred. System call completed successfully."]
Success,
Expand Down Expand Up @@ -875,6 +878,7 @@ pub type Userdata = u64;
#[doc = " Type of a subscription to an event or its occurrence."]
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub enum Eventtype {
#[doc = " The time value of clock `subscription_clock::id` has"]
#[doc = " reached timestamp `subscription_clock::timeout`."]
Expand Down Expand Up @@ -982,6 +986,7 @@ impl core::fmt::Debug for Preopentype {
wai_bindgen_rust::bitflags::bitflags! {
#[doc = " The state of the file descriptor subscribed to with"]
#[doc = " `eventtype::fd_read` or `eventtype::fd_write`."]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub struct Eventrwflags : u16 {
#[doc = " The peer of this socket has closed or disconnected."]
const FD_READWRITE_HANGUP = 1 << 0;
Expand All @@ -998,6 +1003,7 @@ impl Eventrwflags {
#[doc = " `eventtype::fd_write` variants"]
#[repr(C)]
#[derive(Copy, Clone)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub struct EventFdReadwrite {
#[doc = " The number of bytes available for reading or writing."]
pub nbytes: Filesize,
Expand Down
7 changes: 5 additions & 2 deletions lib/wasi-types/src/wasi/wasix_manual.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "enable-serde")]
use serde::{Deserialize, Serialize};
use std::mem::MaybeUninit;

use wasmer::{FromToNativeWasmType, MemorySize, ValueType};
Expand Down Expand Up @@ -179,7 +181,7 @@ unsafe impl ValueType for StackSnapshot {
#[repr(C)]
#[derive(Clone, Copy)]
pub union JoinStatusUnion {
pub nothing_errno: Errno,
pub nothing: u8,
pub exit_normal: Errno,
pub exit_signal: ErrnoSignal,
pub stopped: Signal,
Expand All @@ -196,7 +198,7 @@ impl core::fmt::Debug for JoinStatus {
let mut f = binding.field("tag", &self.tag);
f = unsafe {
match self.tag {
JoinStatusType::Nothing => f.field("nothing_errno", &self.u.nothing_errno),
JoinStatusType::Nothing => f.field("nothing", &self.u.nothing),
JoinStatusType::ExitNormal => f.field("exit_normal", &self.u.exit_normal),
JoinStatusType::ExitSignal => f.field("exit_signal", &self.u.exit_signal),
JoinStatusType::Stopped => f.field("stopped", &self.u.stopped),
Expand Down Expand Up @@ -242,6 +244,7 @@ unsafe impl<M: MemorySize> ValueType for ThreadStart<M> {
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "enable-serde", derive(Serialize, Deserialize))]
pub enum ExitCode {
Errno(Errno),
Other(i32),
Expand Down
2 changes: 2 additions & 0 deletions lib/wasi-web/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions lib/wasi-web/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use wasmer_wasix::{
},
SpawnMemoryType,
},
types::wasi::ExitCode,
wasmer::{AsJs, Memory, MemoryType, Module, Store, WASM_MAX_PAGES},
wasmer_wasix_types::wasi::Errno,
InstanceSnapshot, VirtualTaskManager, WasiEnv, WasiFunctionEnv, WasiThreadError,
Expand Down Expand Up @@ -79,7 +80,7 @@ struct WasmRunCommand {
snapshot: Option<InstanceSnapshot>,
trigger: Option<WasmRunTrigger>,
update_layout: bool,
result: Result<(), Errno>,
result: Option<Result<Bytes, ExitCode>>,
john-sharratt marked this conversation as resolved.
Show resolved Hide resolved
}

trait AssertSendSync: Send + Sync {}
Expand Down Expand Up @@ -320,7 +321,7 @@ impl WebThreadPool {
module_bytes,
snapshot,
update_layout,
result: Ok(()),
result: None,
});
let task = Box::into_raw(task);

Expand Down Expand Up @@ -687,7 +688,7 @@ pub fn schedule_wasm_task(
wasm_bindgen_futures::spawn_local(async move {
if let Some(trigger) = trigger {
let run = trigger.run;
task.result = run().await;
task.result = Some(run().await);
}

let task = Box::into_raw(task);
Expand Down
2 changes: 1 addition & 1 deletion lib/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cfg-if = "1.0"
thiserror = "1"
tracing = { version = "0.1" }
getrandom = "0.2"
wasmer-wasix-types = { path = "../wasi-types", version = "0.3.0" }
wasmer-wasix-types = { path = "../wasi-types", version = "0.3.0", features = [ "enable-serde" ] }
wasmer-types = { path = "../types", version = "=3.2.0", default-features = false }
wasmer = { path = "../api", version = "=3.2.0", default-features = false, features = ["wat", "js-serializable-module"] }
virtual-fs = { path = "../vfs", version = "0.2.0", default-features = false, features = ["webc-fs"] }
Expand Down
40 changes: 14 additions & 26 deletions lib/wasi/src/bin_factory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{pin::Pin, sync::Arc};
use crate::{
os::task::{thread::WasiThreadRunGuard, TaskJoinHandle},
runtime::task_manager::{TaskWasm, TaskWasmRunProperties},
syscalls::rewind,
syscalls::rewind_ext,
RewindState, VirtualBusError, WasiError, WasiRuntimeError,
};
use bytes::Bytes;
use futures::Future;
use tracing::*;
use wasmer::{Function, FunctionEnvMut, Memory32, Memory64, Module, Store};
Expand Down Expand Up @@ -96,9 +97,7 @@ pub fn spawn_exec_module(
// Perform the initialization
let ctx = {
// If this module exports an _initialize function, run that first.
if let Ok(initialize) = ctx
.data(&store)
.inner()
if let Ok(initialize) = unsafe { ctx.data(&store).inner() }
.instance
.exports
.get_function("_initialize")
Expand Down Expand Up @@ -136,8 +135,7 @@ pub fn spawn_exec_module(
}

fn get_start(ctx: &WasiFunctionEnv, store: &Store) -> Option<Function> {
ctx.data(store)
.inner()
unsafe { ctx.data(store).inner() }
.instance
.exports
.get_function("_start")
Expand All @@ -150,44 +148,34 @@ fn call_module(
ctx: WasiFunctionEnv,
mut store: Store,
handle: WasiThreadRunGuard,
rewind_state: Option<(RewindState, Result<(), Errno>)>,
rewind_state: Option<(RewindState, Bytes)>,
) {
let env = ctx.data(&store);
let pid = env.pid();
let tasks = env.tasks().clone();
handle.thread.set_status_running();

// If we need to rewind then do so
if let Some((mut rewind_state, trigger_res)) = rewind_state {
if let Some((rewind_state, rewind_result)) = rewind_state {
if rewind_state.is_64bit {
if let Err(exit_code) =
rewind_state.rewinding_finish::<Memory64>(&ctx, &mut store, trigger_res)
{
ctx.data(&store).blocking_cleanup(Some(exit_code));
return;
}
let res = rewind::<Memory64>(
let res = rewind_ext::<Memory64>(
ctx.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
ctx.data(&store).blocking_cleanup(Some(res.into()));
return;
}
} else {
if let Err(exit_code) =
rewind_state.rewinding_finish::<Memory32>(&ctx, &mut store, trigger_res)
{
ctx.data(&store).blocking_cleanup(Some(exit_code));
return;
}
let res = rewind::<Memory32>(
let res = rewind_ext::<Memory32>(
ctx.env.clone().into_mut(&mut store),
rewind_state.memory_stack,
rewind_state.rewind_stack,
rewind_state.store_data,
rewind_result,
);
if res != Errno::Success {
ctx.data(&store).blocking_cleanup(Some(res.into()));
Expand Down Expand Up @@ -221,16 +209,16 @@ fn call_module(
// Create the callback that will be invoked when the thread respawns after a deep sleep
let rewind = deep.rewind;
let respawn = {
move |ctx, store, trigger_res| {
move |ctx, store, rewind_result| {
// Call the thread
call_module(ctx, store, handle, Some((rewind, trigger_res)));
call_module(ctx, store, handle, Some((rewind, rewind_result)));
}
};

// Spawns the WASM process after a trigger
if let Err(err) =
if let Err(err) = unsafe {
tasks.resume_wasm_after_poller(Box::new(respawn), ctx, store, deep.trigger)
{
} {
debug!("failed to go into deep sleep - {}", err);
}
return;
Expand Down
Loading