Skip to content

Commit

Permalink
Fixed multi-threading bugs
Browse files Browse the repository at this point in the history
There was a thread corruption issue causes when real threads spawned a thread
multiple times. This occured because the thread context was being reused
from previously exited threads.

Also there was a bug in the futex_wait callback where it was not correctly
returning the woken event
  • Loading branch information
john-sharratt authored and theduke committed Feb 27, 2023
1 parent bc2a2dd commit f07e51c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 50 deletions.
6 changes: 1 addition & 5 deletions lib/wasi/src/os/task/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,7 @@ impl WasiProcess {
inner.threads.insert(id, ctrl.clone());
inner.thread_count += 1;

Ok(WasiThreadHandle {
id: Arc::new(id),
thread: ctrl,
inner: self.inner.clone(),
})
Ok(WasiThreadHandle::new(ctrl, &self.inner))
}

/// Gets a reference to a particular thread
Expand Down
46 changes: 28 additions & 18 deletions lib/wasi/src/os/task/thread.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::{Arc, Mutex, RwLock},
sync::{Arc, Mutex, RwLock, Weak},
task::Waker,
};

Expand Down Expand Up @@ -379,29 +379,45 @@ impl WasiThread {
}
}

#[derive(Debug)]
pub struct WasiThreadHandleProtected {
thread: WasiThread,
inner: Weak<RwLock<WasiProcessInner>>,
}

#[derive(Debug, Clone)]
pub struct WasiThreadHandle {
pub(super) id: Arc<WasiThreadId>,
pub(super) thread: WasiThread,
pub(super) inner: Arc<RwLock<WasiProcessInner>>,
protected: Arc<WasiThreadHandleProtected>,
}

impl WasiThreadHandle {
pub(crate) fn new(
thread: WasiThread,
inner: &Arc<RwLock<WasiProcessInner>>,
) -> WasiThreadHandle {
Self {
protected: Arc::new(WasiThreadHandleProtected {
thread,
inner: Arc::downgrade(inner),
}),
}
}

pub fn id(&self) -> WasiThreadId {
self.id.0.into()
self.protected.thread.tid()
}

pub fn as_thread(&self) -> WasiThread {
self.thread.clone()
self.protected.thread.clone()
}
}

impl Drop for WasiThreadHandle {
impl Drop for WasiThreadHandleProtected {
fn drop(&mut self) {
// We do this so we track when the last handle goes out of scope
if let Some(id) = Arc::get_mut(&mut self.id) {
let mut inner = self.inner.write().unwrap();
if let Some(ctrl) = inner.threads.remove(id) {
let id = self.thread.tid();
if let Some(inner) = Weak::upgrade(&self.inner) {
let mut inner = inner.write().unwrap();
if let Some(ctrl) = inner.threads.remove(&id) {
ctrl.set_status_finished(Ok(0));
}
inner.thread_count -= 1;
Expand All @@ -413,13 +429,7 @@ impl std::ops::Deref for WasiThreadHandle {
type Target = WasiThread;

fn deref(&self) -> &Self::Target {
&self.thread
}
}

impl std::ops::DerefMut for WasiThreadHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.thread
&self.protected.thread
}
}

Expand Down
26 changes: 13 additions & 13 deletions lib/wasi/src/syscalls/wasix/futex_wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,10 @@ pub fn futex_wait<M: MemorySize>(
timeout: WasmPtr<OptionTimestamp, M>,
ret_woken: WasmPtr<Bool, M>,
) -> Result<Errno, WasiError> {
trace!(
"wasi[{}:{}]::futex_wait(offset={})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset()
);

wasi_try_ok!(WasiEnv::process_signals_and_exit(&mut ctx)?);

let mut env = ctx.data();
let state = env.state.clone();

let futex_idx: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));

// Determine the timeout
let mut env = ctx.data();
let timeout = {
let memory = env.memory_view(&ctx);
wasi_try_mem_ok!(timeout.read(&memory))
Expand All @@ -97,6 +86,17 @@ pub fn futex_wait<M: MemorySize>(
_ => None,
};

trace!(
"wasi[{}:{}]::futex_wait(offset={}, timeout={:?})",
ctx.data().pid(),
ctx.data().tid(),
futex_ptr.offset(),
timeout
);

let state = env.state.clone();
let futex_idx: u64 = wasi_try_ok!(futex_ptr.offset().try_into().map_err(|_| Errno::Overflow));

// Create a poller which will register ourselves against
// this futex event and check when it has changed
let view = env.memory_view(&ctx);
Expand All @@ -123,6 +123,6 @@ pub fn futex_wait<M: MemorySize>(
};
let memory = env.memory_view(&ctx);
let mut env = ctx.data();
wasi_try_mem_ok!(ret_woken.write(&memory, Bool::False));
wasi_try_mem_ok!(ret_woken.write(&memory, woken));
Ok(ret)
}
43 changes: 29 additions & 14 deletions lib/wasi/src/syscalls/wasix/thread_spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ pub fn thread_spawn<M: MemorySize>(
reactor: Bool,
ret_tid: WasmPtr<Tid, M>,
) -> Errno {
debug!(
"wasi[{}:{}]::thread_spawn (reactor={:?}, thread_id={}, stack_base={}, caller_id={})",
ctx.data().pid(),
ctx.data().tid(),
reactor,
ctx.data().thread.tid().raw(),
stack_base,
current_caller_id().raw()
);

// Now we use the environment and memory references
let env = ctx.data();
let memory = env.memory_view(&ctx);
Expand All @@ -53,11 +43,10 @@ pub fn thread_spawn<M: MemorySize>(
Ok(h) => h,
Err(err) => {
error!(
"wasi[{}:{}]::thread_spawn (reactor={:?}, thread_id={}, stack_base={}, caller_id={}) - failed to create thread handle: {}",
"wasi[{}:{}]::thread_spawn (reactor={:?}, stack_base={}, caller_id={}) - failed to create thread handle: {}",
ctx.data().pid(),
ctx.data().tid(),
reactor,
ctx.data().thread.tid().raw(),
stack_base,
current_caller_id().raw(),
err
Expand All @@ -68,6 +57,16 @@ pub fn thread_spawn<M: MemorySize>(
};
let thread_id: Tid = thread_handle.id().into();

debug!(
%thread_id,
"wasi[{}:{}]::thread_spawn (reactor={:?}, stack_base={}, caller_id={})",
ctx.data().pid(),
ctx.data().tid(),
reactor,
stack_base,
current_caller_id().raw()
);

// We need a copy of the process memory and a packaged store in order to
// launch threads and reactors
let thread_memory = wasi_try!(ctx.data().memory().try_clone(&ctx).ok_or_else(|| {
Expand Down Expand Up @@ -145,18 +144,28 @@ pub fn thread_spawn<M: MemorySize>(
let user_data_low: u32 = (user_data & 0xFFFFFFFF) as u32;
let user_data_high: u32 = (user_data >> 32) as u32;

trace!(
%user_data,
"wasi[{}:{}]::thread_spawn spawn.call()",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
);

let mut ret = Errno::Success;
if let Err(err) = spawn.call(store, user_data_low as i32, user_data_high as i32) {
match err.downcast::<WasiError>() {
Ok(WasiError::Exit(0)) => ret = Errno::Success,
Ok(WasiError::Exit(code)) => {
debug!(
%code,
"wasi[{}:{}]::thread_spawn - thread exited",
ctx.data(&store).pid(),
ctx.data(&store).tid(),
);
ret = Errno::Noexec;
ret = if code == 0 {
Errno::Success
} else {
Errno::Noexec
};
}
Ok(WasiError::UnknownWasiVersion) => {
debug!(
Expand Down Expand Up @@ -219,6 +228,12 @@ pub fn thread_spawn<M: MemorySize>(
if let Some(thread) = thread {
let mut store = thread.store.borrow_mut();
let ret = call_module(&thread.ctx, store.deref_mut());

{
let mut guard = state.threading.write().unwrap();
guard.thread_ctx.remove(&caller_id);
}

return ret;
}

Expand Down

0 comments on commit f07e51c

Please sign in to comment.