Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wasix multithreading fix #3598

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ sha2 = { version = "0.10" }
waker-fn = { version = "1.1" }
cooked-waker = "^5"
rand = "0.8"
tokio = { version = "1", features = ["sync", "macros", "time"], default_features = false }
tokio = { version = "1", features = ["sync", "macros", "time", "rt"], default_features = false }
futures = { version = "0.3" }
# used by feature='os'
async-trait = { version = "^0.1" }
Expand Down
6 changes: 1 addition & 5 deletions lib/wasi/src/os/task/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,7 @@ impl WasiProcess {
inner.threads.insert(id, ctrl.clone());
inner.thread_count += 1;

Ok(WasiThreadHandle {
id: Arc::new(id),
thread: ctrl,
inner: self.inner.clone(),
})
Ok(WasiThreadHandle::new(ctrl, &self.inner))
}

/// Gets a reference to a particular thread
Expand Down
46 changes: 28 additions & 18 deletions lib/wasi/src/os/task/thread.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, Weak},
task::Waker,
};

Expand Down Expand Up @@ -379,29 +379,45 @@ impl WasiThread {
}
}

#[derive(Debug)]
pub struct WasiThreadHandleProtected {
thread: WasiThread,
inner: Weak<RwLock<WasiProcessInner>>,
}

#[derive(Debug, Clone)]
pub struct WasiThreadHandle {
pub(super) id: Arc<WasiThreadId>,
pub(super) thread: WasiThread,
pub(super) inner: Arc<RwLock<WasiProcessInner>>,
protected: Arc<WasiThreadHandleProtected>,
}

impl WasiThreadHandle {
pub(crate) fn new(
thread: WasiThread,
inner: &Arc<RwLock<WasiProcessInner>>,
) -> WasiThreadHandle {
Self {
protected: Arc::new(WasiThreadHandleProtected {
thread,
inner: Arc::downgrade(inner),
}),
}
}

pub fn id(&self) -> WasiThreadId {
self.id.0.into()
self.protected.thread.tid()
}

pub fn as_thread(&self) -> WasiThread {
self.thread.clone()
self.protected.thread.clone()
}
}

impl Drop for WasiThreadHandle {
impl Drop for WasiThreadHandleProtected {
fn drop(&mut self) {
// We do this so we track when the last handle goes out of scope
if let Some(id) = Arc::get_mut(&mut self.id) {
let mut inner = self.inner.write().unwrap();
if let Some(ctrl) = inner.threads.remove(id) {
let id = self.thread.tid();
if let Some(inner) = Weak::upgrade(&self.inner) {
let mut inner = inner.write().unwrap();
if let Some(ctrl) = inner.threads.remove(&id) {
ctrl.set_status_finished(Ok(0));
}
inner.thread_count -= 1;
Expand All @@ -413,13 +429,7 @@ impl std::ops::Deref for WasiThreadHandle {
type Target = WasiThread;

fn deref(&self) -> &Self::Target {
&self.thread
}
}

impl std::ops::DerefMut for WasiThreadHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.thread
&self.protected.thread
}
}

Expand Down
26 changes: 13 additions & 13 deletions lib/wasi/src/syscalls/wasix/futex_wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,10 @@ pub fn futex_wait<M: MemorySize>(
timeout: WasmPtr<OptionTimestamp, M>,
ret_woken: WasmPtr<Bool, M>,
) -> Result<Errno, WasiError> {
trace!(
"wasi[{}:{}]::futex_wait(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()
);

wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);

let mut env = ctx.data();
let state = env.state.clone();

let futex_idx: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));

// Determine the timeout
let mut env = ctx.data();
let timeout = {
let memory = env.memory_view(&ctx);
wasi_try_mem_ok!(timeout.read(&memory))
Expand All @@ -97,6 +86,17 @@ pub fn futex_wait<M: MemorySize>(
_ => None,
};

trace!(
"wasi[{}:{}]::futex_wait(offset={}, timeout={:?})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset(),
timeout
);

let state = env.state.clone();
let futex_idx: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));

// Create a poller which will register ourselves against
// this futex event and check when it has changed
let view = env.memory_view(&ctx);
Expand All @@ -123,6 +123,6 @@ pub fn futex_wait<M: MemorySize>(
};
let memory = env.memory_view(&ctx);
let mut env = ctx.data();
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
wasi_try_mem_ok!(ret_woken.write(&memory, woken));
Ok(ret)
}
43 changes: 29 additions & 14 deletions lib/wasi/src/syscalls/wasix/thread_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ pub fn thread_spawn<M: MemorySize>(
reactor: Bool,
ret_tid: WasmPtr<Tid, M>,
) -> Errno {
debug!(
"wasi[{}:{}]::thread_spawn (reactor={:?}, thread_id={}, stack_base={}, caller_id={})",
ctx.data().pid(),
ctx.data().tid(),
reactor,
ctx.data().thread.tid().raw(),
stack_base,
current_caller_id().raw()
);

// Now we use the environment and memory references
let env = ctx.data();
let memory = env.memory_view(&ctx);
Expand All @@ -53,11 +43,10 @@ pub fn thread_spawn<M: MemorySize>(
Ok(h) => h,
Err(err) => {
error!(
"wasi[{}:{}]::thread_spawn (reactor={:?}, thread_id={}, stack_base={}, caller_id={}) - failed to create thread handle: {}",
"wasi[{}:{}]::thread_spawn (reactor={:?}, stack_base={}, caller_id={}) - failed to create thread handle: {}",
ctx.data().pid(),
ctx.data().tid(),
reactor,
ctx.data().thread.tid().raw(),
stack_base,
current_caller_id().raw(),
err
Expand All @@ -68,6 +57,16 @@ pub fn thread_spawn<M: MemorySize>(
};
let thread_id: Tid = thread_handle.id().into();

debug!(
%thread_id,
"wasi[{}:{}]::thread_spawn (reactor={:?}, stack_base={}, caller_id={})",
ctx.data().pid(),
ctx.data().tid(),
reactor,
stack_base,
current_caller_id().raw()
);

// We need a copy of the process memory and a packaged store in order to
// launch threads and reactors
let thread_memory = wasi_try!(ctx.data().memory().try_clone(&ctx).ok_or_else(|| {
Expand Down Expand Up @@ -145,18 +144,28 @@ pub fn thread_spawn<M: MemorySize>(
let user_data_low: u32 = (user_data & 0xFFFFFFFF) as u32;
let user_data_high: u32 = (user_data >> 32) as u32;

trace!(
%user_data,
"wasi[{}:{}]::thread_spawn spawn.call()",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
);

let mut ret = Errno::Success;
if let Err(err) = spawn.call(store, user_data_low as i32, user_data_high as i32) {
match err.downcast::<WasiError>() {
Ok(WasiError::Exit(0)) => ret = Errno::Success,
Ok(WasiError::Exit(code)) => {
debug!(
%code,
"wasi[{}:{}]::thread_spawn - thread exited",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
);
ret = Errno::Noexec;
ret = if code == 0 {
Errno::Success
} else {
Errno::Noexec
};
}
Ok(WasiError::UnknownWasiVersion) => {
debug!(
Expand Down Expand Up @@ -219,6 +228,12 @@ pub fn thread_spawn<M: MemorySize>(
if let Some(thread) = thread {
let mut store = thread.store.borrow_mut();
let ret = call_module(&thread.ctx, store.deref_mut());

{
let mut guard = state.threading.write().unwrap();
guard.thread_ctx.remove(&caller_id);
}

return ret;
}

Expand Down