Skip to content

Commit

Permalink
Modified the store so that it can be Cloned
Browse files Browse the repository at this point in the history
Added a Reactor that drives WebAssembly react pattern
Fixed multithreading
Added thread local storage
Implemented the remaining functionality for the bus
  • Loading branch information
john-sharratt committed Jul 22, 2022
1 parent 43138b5 commit af5663b
Show file tree
Hide file tree
Showing 69 changed files with 2,860 additions and 3,657 deletions.
82 changes: 62 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions docs/migration_to_3.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ import_object.define("env", "host_function", host_function);
let instance = Instance::new(&mut store, &module, &import_object).expect("Could not instantiate module.");
```

For WASI, don't forget to import memory to `WasiEnv`
For WASI, don't forget to initialize it

```rust
let mut wasi_env = WasiState::new("hello").finalize()?;
let import_object = wasi_env.import_object(&mut store, &module)?;
let instance = Instance::new(&mut store, &module, &import_object).expect("Could not instantiate module.");
let memory = instance.exports.get_memory("memory")?;
wasi_env.data_mut(&mut store).set_memory(memory.clone());
wasi_env.initialize(&mut store, &instance).unwrap();
```

#### `ChainableNamedResolver` is removed
Expand Down
4 changes: 2 additions & 2 deletions examples/imports_function_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// This struct may have been anything. The only constraint is it must be
// possible to know the size of the `Env` at compile time (i.e it has to
// implement the `Sized` trait).
// The Env is then accessed using `data()` or `data_mut()` method.
// The Env is then accessed using `data()` method.
#[derive(Clone)]
struct Env {
counter: Arc<Mutex<i32>>,
Expand All @@ -82,7 +82,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
*env.data().counter.lock().unwrap()
}
fn add_to_counter(mut env: FunctionEnvMut<Env>, add: i32) -> i32 {
let mut counter_ref = env.data_mut().counter.lock().unwrap();
let mut counter_ref = env.data().counter.lock().unwrap();

*counter_ref += add;
*counter_ref
Expand Down
6 changes: 2 additions & 4 deletions examples/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//!
//! Ready?
use wasmer::{FunctionEnv, Instance, Module, Store};
use wasmer::{Instance, Module, Store};
use wasmer_compiler::Universal;
use wasmer_compiler_cranelift::Cranelift;
use wasmer_wasi::WasiState;
Expand Down Expand Up @@ -52,9 +52,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let instance = Instance::new(&mut store, &module, &import_object)?;

println!("Attach WASI memory...");
// Attach the memory export
let memory = instance.exports.get_memory("memory")?;
wasi_env.data_mut(&mut store).set_memory(memory.clone());
wasi_env.initialize(&mut store, &instance).unwrap();

println!("Call WASI `_start` function...");
// And we just call the `_start` function!
Expand Down
2 changes: 2 additions & 0 deletions lib/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ indexmap = { version = "1.6", features = ["serde-1"] }
cfg-if = "1.0"
thiserror = "1.0"
more-asserts = "0.2"
tracing = "0.1"
waker-fn = { version = "1.1" }
# - Optional shared dependencies.
wat = { version = "1.0", optional = true }

Expand Down
88 changes: 88 additions & 0 deletions lib/api/src/common/reactors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::collections::VecDeque;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::mpsc;
use std::sync::Arc;
use std::task::Waker;
use std::time::Duration;

/// Reactor pattern implementation that allows for web assembly
/// processes to easily implement asynchronous IO
#[derive(Debug, Clone)]
pub struct Reactors
{
waker: Waker,
woken: Arc<AtomicBool>,
waiting: Arc<Mutex<VecDeque<mpsc::Sender<()>>>>,
}

impl Default
for Reactors {
fn default() -> Self {
let woken = Arc::new(AtomicBool::new(false));
let waiting: Arc<Mutex<VecDeque<mpsc::Sender<()>>>> = Default::default();

let waker = {
let woken = woken.clone();
let waiting = Arc::downgrade(&waiting);
waker_fn::waker_fn(move || {
if let Some(waiting) = waiting.upgrade() {
let mut guard = waiting.lock().unwrap();
woken.store(true, Ordering::Release);
if let Some(reactor) = guard.pop_front() {
let _ = reactor.send(());
}
}
})
};

Self {
waker,
woken,
waiting,
}
}
}

impl Reactors
{
/// Gets a reference to the waker that can be used for
/// asynchronous calls
pub fn get_waker(&self) -> Waker {
self.waker.clone()
}

/// Wakes one of the reactors thats currently waiting
pub fn wake(&self) {
self.waker.wake_by_ref();
}

/// Wakes all of the reactors thats currently waiting
pub fn wake_all(&self) {
let mut guard = self.waiting.lock().unwrap();
self.woken.store(true, Ordering::Release);
guard.clear();
}

/// Returns true if woken, otherwise false for timeout
pub fn wait(&self, timeout: Duration) -> bool {
let rx = {
let mut guard = self.waiting.lock().unwrap();
if self.woken.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire).is_ok() {
return true;
}
if timeout.is_zero() {
return false;
}

let (tx, rx) = mpsc::channel();
guard.push_back(tx);
rx
};
match rx.recv_timeout(timeout) {
Ok(_) => true,
Err(_) => false,
}
}
}
4 changes: 3 additions & 1 deletion lib/api/src/js/function_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ impl<T: Send + 'static> FunctionEnvMut<'_, T> {
}

/// Returns a mutable- reference to the host state in this context.
pub fn data_mut<'a>(&'a mut self) -> &'a mut T {
/// (this will only return a mutable reference as long as the environment
/// has not been cloned - environments are cloned during multithreading)
pub fn data_mut<'a>(&'a mut self) -> Option<&'a mut T> {
self.func_env.as_mut(&mut self.store_mut)
}

Expand Down
Loading

0 comments on commit af5663b

Please sign in to comment.