Skip to content

Commit

Permalink
Switched ModuleCache over to the new CompiledModuleCache infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-F-Bryan committed May 3, 2023
1 parent b4e1389 commit 7b71bec
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 115 deletions.
18 changes: 16 additions & 2 deletions lib/wasi/src/bin_factory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ pub fn spawn_exec(
// The deterministic id for this engine
let compiler = store.engine().deterministic_id();

let module = compiled_modules.get_compiled_module(&store, binary.hash().as_str(), compiler);
let tasks = runtime.task_manager();

let module = tasks.block_on(compiled_modules.get_compiled_module(
&store,
binary.hash().as_str(),
compiler,
&**tasks,
));

let module = match (module, binary.entry.as_ref()) {
(Some(a), _) => a,
(None, Some(entry)) => {
Expand All @@ -43,7 +51,13 @@ pub fn spawn_exec(
env.blocking_cleanup(Some(Errno::Noexec.into()));
}
let module = module?;
compiled_modules.set_compiled_module(binary.hash().as_str(), compiler, &module);

tasks.block_on(compiled_modules.set_compiled_module(
binary.hash().as_str(),
compiler,
&module,
&**tasks,
));
module
}
(None, None) => {
Expand Down
130 changes: 26 additions & 104 deletions lib/wasi/src/bin_factory/module_cache.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use std::{cell::RefCell, collections::HashMap, ops::DerefMut, path::PathBuf, sync::RwLock};
use std::path::PathBuf;

use anyhow::Context;
use wasmer::Module;

use super::BinaryPackage;
use crate::WasiRuntime;
use crate::{runtime::module_cache::CompiledModuleCache, VirtualTaskManager, WasiRuntime};

pub const DEFAULT_COMPILED_PATH: &str = "~/.wasmer/compiled";

#[derive(Debug)]
pub struct ModuleCache {
pub(crate) cache_compile_dir: PathBuf,
pub(crate) cached_modules: Option<RwLock<HashMap<String, Module>>>,
}
pub struct ModuleCache(Box<dyn CompiledModuleCache>);

// FIXME: remove impls!
// Added as a stopgap to get the crate to compile again with the "js" feature.
Expand All @@ -28,31 +25,18 @@ impl Default for ModuleCache {
}
}

thread_local! {
static THREAD_LOCAL_CACHED_MODULES: std::cell::RefCell<HashMap<String, Module>>
= RefCell::new(HashMap::new());
}

impl ModuleCache {
/// Create a new [`ModuleCache`].
///
/// use_shared_cache enables a shared cache of modules in addition to a thread-local cache.
pub fn new(cache_compile_dir: Option<PathBuf>, use_shared_cache: bool) -> ModuleCache {
pub fn new(cache_compile_dir: Option<PathBuf>, _use_shared_cache: bool) -> ModuleCache {
let cache_compile_dir = cache_compile_dir.unwrap_or_else(|| {
PathBuf::from(shellexpand::tilde(DEFAULT_COMPILED_PATH).into_owned())
});
let _ = std::fs::create_dir_all(&cache_compile_dir);

let cached_modules = if use_shared_cache {
Some(RwLock::new(HashMap::default()))
} else {
None
};
let cache = crate::runtime::module_cache::default_cache(&cache_compile_dir);

ModuleCache {
cached_modules,
cache_compile_dir,
}
ModuleCache(Box::new(cache))
}

pub fn get_webc(
Expand All @@ -75,99 +59,37 @@ impl ModuleCache {
})
}

pub fn get_compiled_module(
pub async fn get_compiled_module(
&self,
engine: &impl wasmer::AsEngineRef,
data_hash: &str,
compiler: &str,
task_manager: &dyn VirtualTaskManager,
) -> Option<Module> {
let key = format!("{}-{}", data_hash, compiler);

// fastest path
{
let module = THREAD_LOCAL_CACHED_MODULES.with(|cache| {
let cache = cache.borrow();
cache.get(&key).cloned()
});
if let Some(module) = module {
return Some(module);
}
}

// fast path
if let Some(cache) = &self.cached_modules {
let cache = cache.read().unwrap();
if let Some(module) = cache.get(&key) {
THREAD_LOCAL_CACHED_MODULES.with(|cache| {
let mut cache = cache.borrow_mut();
cache.insert(key.clone(), module.clone());
});
return Some(module.clone());
}
}

// slow path
let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str());
if let Ok(data) = std::fs::read(path.as_path()) {
tracing::trace!("bin file found: {:?} [len={}]", path.as_path(), data.len());
let mut decoder = weezl::decode::Decoder::new(weezl::BitOrder::Msb, 8);
if let Ok(data) = decoder.decode(&data[..]) {
let module_bytes = bytes::Bytes::from(data);

// Load the module
let module = match Module::deserialize_checked(engine, &module_bytes[..]) {
Ok(m) => m,
Err(err) => {
tracing::error!(
"failed to deserialize module with hash '{data_hash}': {err}"
);
return None;
}
};

if let Some(cache) = &self.cached_modules {
let mut cache = cache.write().unwrap();
cache.insert(key.clone(), module.clone());
}

THREAD_LOCAL_CACHED_MODULES.with(|cache| {
let mut cache = cache.borrow_mut();
cache.insert(key.clone(), module.clone());
});
return Some(module);
}
}

// Not found
tracing::trace!("bin file not found: {:?}", path);
None
self.0
.load(&key, engine.as_engine_ref().engine(), task_manager)
.await
.ok()
}

pub fn set_compiled_module(&self, data_hash: &str, compiler: &str, module: &Module) {
pub async fn set_compiled_module(
&self,
data_hash: &str,
compiler: &str,
module: &Module,
task_manager: &dyn VirtualTaskManager,
) {
let key = format!("{}-{}", data_hash, compiler);

// Add the module to the local thread cache
THREAD_LOCAL_CACHED_MODULES.with(|cache| {
let mut cache = cache.borrow_mut();
let cache = cache.deref_mut();
cache.insert(key.clone(), module.clone());
});

// Serialize the compiled module into bytes and insert it into the cache
if let Some(cache) = &self.cached_modules {
let mut cache = cache.write().unwrap();
cache.insert(key.clone(), module.clone());
}

// We should also attempt to store it in the cache directory
let compiled_bytes = module.serialize().unwrap();

let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str());
// TODO: forward error!
let _ = std::fs::create_dir_all(path.parent().unwrap());
let mut encoder = weezl::encode::Encoder::new(weezl::BitOrder::Msb, 8);
if let Ok(compiled_bytes) = encoder.encode(&compiled_bytes[..]) {
let _ = std::fs::write(path, &compiled_bytes[..]);
if let Err(e) = self.0.save(&key, module, task_manager).await {
tracing::warn!(
data_hash,
compiler,
error = &e as &dyn std::error::Error,
"Unable to cache the module",
);
}
}
}
Expand Down
74 changes: 65 additions & 9 deletions lib/wasi/src/runtime/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#[cfg(feature = "sys-thread")]
pub mod tokio;

use std::{pin::Pin, time::Duration};
use std::{ops::Deref, pin::Pin, time::Duration};

use ::tokio::runtime::Handle;
use futures::Future;
Expand Down Expand Up @@ -79,6 +79,64 @@ pub trait VirtualTaskManager: std::fmt::Debug + Send + Sync + 'static {
fn thread_parallelism(&self) -> Result<usize, WasiThreadError>;
}

#[async_trait::async_trait]
impl<D, T> VirtualTaskManager for D
where
D: Deref<Target = T> + std::fmt::Debug + Send + Sync + 'static,
T: VirtualTaskManager + ?Sized,
{
fn build_memory(
&self,
store: &mut StoreMut,
spawn_type: SpawnType,
) -> Result<Option<Memory>, WasiThreadError> {
(**self).build_memory(store, spawn_type)
}

async fn sleep_now(&self, time: Duration) {
(**self).sleep_now(time).await
}

fn task_shared(
&self,
task: Box<
dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static,
>,
) -> Result<(), WasiThreadError> {
(**self).task_shared(task)
}

fn runtime(&self) -> &Handle {
(**self).runtime()
}

#[allow(dyn_drop)]
fn runtime_enter<'g>(&'g self) -> Box<dyn std::ops::Drop + 'g> {
(**self).runtime_enter()
}

fn task_wasm(
&self,
task: Box<dyn FnOnce(Store, Module, Option<Memory>) + Send + 'static>,
store: Store,
module: Module,
spawn_type: SpawnType,
) -> Result<(), WasiThreadError> {
(**self).task_wasm(task, store, module, spawn_type)
}

fn task_dedicated(
&self,
task: Box<dyn FnOnce() + Send + 'static>,
) -> Result<(), WasiThreadError> {
(**self).task_dedicated(task)
}

fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
(**self).thread_parallelism()
}
}

impl dyn VirtualTaskManager {
/// Execute a future and return the output.
/// This method blocks until the future is complete.
Expand All @@ -93,14 +151,12 @@ pub trait VirtualTaskManagerExt {
fn block_on<'a, A>(&self, task: impl Future<Output = A> + 'a) -> A;
}

impl<'a, T: VirtualTaskManager> VirtualTaskManagerExt for &'a T {
fn block_on<'x, A>(&self, task: impl Future<Output = A> + 'x) -> A {
self.runtime().block_on(task)
}
}

impl<T: VirtualTaskManager + ?Sized> VirtualTaskManagerExt for std::sync::Arc<T> {
fn block_on<'x, A>(&self, task: impl Future<Output = A> + 'x) -> A {
impl<D, T> VirtualTaskManagerExt for D
where
D: Deref<Target = T>,
T: VirtualTaskManager + ?Sized,
{
fn block_on<'a, A>(&self, task: impl Future<Output = A> + 'a) -> A {
self.runtime().block_on(task)
}
}

0 comments on commit 7b71bec

Please sign in to comment.