Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a module cache abstraction #3841

Merged
merged 18 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions lib/wasi/src/bin_factory/binary_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use semver::Version;
use virtual_fs::FileSystem;
use webc::compat::SharedBytes;

use crate::runtime::module_cache::Key;
use crate::runtime::module_cache::ModuleHash;

#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct BinaryPackageCommand {
name: String,
#[derivative(Debug = "ignore")]
pub(crate) atom: SharedBytes,
hash: OnceCell<Key>,
hash: OnceCell<ModuleHash>,
}

impl BinaryPackageCommand {
Expand All @@ -38,8 +38,8 @@ impl BinaryPackageCommand {
&self.atom
}

pub fn hash(&self) -> &Key {
self.hash.get_or_init(|| Key::sha256(self.atom()))
pub fn hash(&self) -> &ModuleHash {
self.hash.get_or_init(|| ModuleHash::sha256(self.atom()))
}
}

Expand All @@ -55,7 +55,7 @@ pub struct BinaryPackage {
pub when_cached: Option<u128>,
#[derivative(Debug = "ignore")]
pub entry: Option<SharedBytes>,
pub hash: OnceCell<Key>,
pub hash: OnceCell<ModuleHash>,
pub webc_fs: Option<Arc<dyn FileSystem + Send + Sync + 'static>>,
pub commands: Arc<RwLock<Vec<BinaryPackageCommand>>>,
pub uses: Vec<String>,
Expand All @@ -65,12 +65,12 @@ pub struct BinaryPackage {
}

impl BinaryPackage {
pub fn hash(&self) -> Key {
pub fn hash(&self) -> ModuleHash {
*self.hash.get_or_init(|| {
if let Some(entry) = self.entry.as_ref() {
Key::sha256(entry)
ModuleHash::sha256(entry)
} else {
Key::sha256(self.package_name.as_bytes())
ModuleHash::sha256(self.package_name.as_bytes())
}
})
}
Expand Down
7 changes: 2 additions & 5 deletions lib/wasi/src/bin_factory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ pub async fn spawn_exec(
env: WasiEnv,
runtime: &Arc<dyn WasiRuntime + Send + Sync + 'static>,
) -> Result<TaskJoinHandle, VirtualBusError> {
// The deterministic id for this engine
let compiler = store.engine().deterministic_id();

let key = binary.hash().combined_with(compiler);
let key = binary.hash();

let compiled_modules = runtime.module_cache();
let module = compiled_modules.load(key, store.engine()).await.ok();
Expand All @@ -49,7 +46,7 @@ pub async fn spawn_exec(
}
let module = module?;

if let Err(e) = compiled_modules.save(key, &module).await {
if let Err(e) = compiled_modules.save(key, store.engine(), &module).await {
tracing::debug!(
%key,
package_name=%binary.package_name,
Expand Down
6 changes: 1 addition & 5 deletions lib/wasi/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ where
fn package_resolver(&self) -> Arc<dyn PackageResolver + Send + Sync>;

/// A cache for compiled modules.
///
/// Caching is disabled by default.
fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync> {
Arc::new(module_cache::Disabled)
}
fn module_cache(&self) -> Arc<dyn ModuleCache + Send + Sync>;

/// Get a [`wasmer::Engine`] for module compilation.
fn engine(&self) -> Option<wasmer::Engine> {
Expand Down
17 changes: 11 additions & 6 deletions lib/wasi/src/runtime/module_cache/and_then.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use wasmer::{Engine, Module};

use crate::runtime::module_cache::{CacheError, Key, ModuleCache};
use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};

/// A [`ModuleCache`] combinator which will try operations on one cache
/// and fall back to a secondary cache if they fail.
Expand Down Expand Up @@ -45,7 +45,7 @@ where
Primary: ModuleCache + Send + Sync,
Secondary: ModuleCache + Send + Sync,
{
async fn load(&self, key: Key, engine: &Engine) -> Result<Module, CacheError> {
async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
let primary_error = match self.primary.load(key, engine).await {
Ok(m) => return Ok(m),
Err(e) => e,
Expand All @@ -54,7 +54,7 @@ where
if let Ok(m) = self.secondary.load(key, engine).await {
// Now we've got a module, let's make sure it ends up in the primary
// cache too.
if let Err(e) = self.primary.save(key, &m).await {
if let Err(e) = self.primary.save(key, engine, &m).await {
tracing::warn!(
%key,
error = &e as &dyn std::error::Error,
Expand All @@ -68,10 +68,15 @@ where
Err(primary_error)
}

async fn save(&self, key: Key, module: &Module) -> Result<(), CacheError> {
async fn save(
&self,
key: ModuleHash,
engine: &Engine,
module: &Module,
) -> Result<(), CacheError> {
futures::try_join!(
self.primary.save(key, module),
self.secondary.save(key, module)
self.primary.save(key, engine, module),
self.secondary.save(key, engine, module)
)?;
Ok(())
}
Expand Down
18 changes: 0 additions & 18 deletions lib/wasi/src/runtime/module_cache/disabled.rs

This file was deleted.

44 changes: 25 additions & 19 deletions lib/wasi/src/runtime/module_cache/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use wasmer::{Engine, Module};

use crate::runtime::module_cache::{CacheError, Key, ModuleCache};
use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};

/// A cache that saves modules to a folder on the host filesystem using
/// [`Module::serialize()`].
Expand All @@ -26,24 +26,24 @@ impl FileSystemCache {
&self.cache_dir
}

fn path(&self, key: Key) -> PathBuf {
fn path(&self, key: ModuleHash, deterministic_id: &str) -> PathBuf {
let artifact_version = wasmer_types::MetadataHeader::CURRENT_VERSION;
let dir = format!("artifact-v{artifact_version}");
self.cache_dir
.join(dir)
.join(format!("{deterministic_id}-v{artifact_version}"))
.join(key.to_string())
.with_extension("bin")
}
}

#[async_trait::async_trait]
impl ModuleCache for FileSystemCache {
async fn load(&self, key: Key, engine: &Engine) -> Result<Module, CacheError> {
let path = self.path(key);
async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
let path = self.path(key, engine.deterministic_id());

// FIXME: This will all block the thread at the moment. Ideally,
// deserializing and uncompressing would happen on a thread pool in the
// background.
// https://github.com/wasmerio/wasmer/issues/3851

let uncompressed = read_compressed(&path)?;

Expand Down Expand Up @@ -71,12 +71,18 @@ impl ModuleCache for FileSystemCache {
}
}

async fn save(&self, key: Key, module: &Module) -> Result<(), CacheError> {
let path = self.path(key);
async fn save(
&self,
key: ModuleHash,
engine: &Engine,
module: &Module,
) -> Result<(), CacheError> {
let path = self.path(key, engine.deterministic_id());

// FIXME: This will all block the thread at the moment. Ideally,
// serializing and compressing would happen on a thread pool in the
// background.
// https://github.com/wasmerio/wasmer/issues/3851

if let Some(parent) = path.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
Expand Down Expand Up @@ -107,10 +113,10 @@ impl ModuleCache for FileSystemCache {
);
}

return Err(CacheError::Write { path, error: e });
return Err(CacheError::FileWrite { path, error: e });
}

std::fs::rename(&tmp, &path).map_err(|error| CacheError::Write { path, error })
std::fs::rename(&tmp, &path).map_err(|error| CacheError::FileWrite { path, error })
}
}

Expand All @@ -133,7 +139,7 @@ fn read_compressed(path: &Path) -> Result<Vec<u8>, CacheError> {
return Err(CacheError::NotFound);
}
Err(error) => {
return Err(CacheError::Read {
return Err(CacheError::FileRead {
path: path.to_path_buf(),
error,
});
Expand Down Expand Up @@ -173,10 +179,10 @@ mod tests {
let engine = Engine::default();
let module = Module::new(&engine, ADD_WAT).unwrap();
let cache = FileSystemCache::new(temp.path());
let key = Key::new([0; 32]);
let expected_path = cache.path(key);
let key = ModuleHash::new([0; 32]);
let expected_path = cache.path(key, engine.deterministic_id());

cache.save(key, &module).await.unwrap();
cache.save(key, &engine, &module).await.unwrap();

assert!(expected_path.exists());
}
Expand All @@ -189,9 +195,9 @@ mod tests {
let cache_dir = temp.path().join("this").join("doesn't").join("exist");
assert!(!cache_dir.exists());
let cache = FileSystemCache::new(&cache_dir);
let key = Key::new([0; 32]);
let key = ModuleHash::new([0; 32]);

cache.save(key, &module).await.unwrap();
cache.save(key, &engine, &module).await.unwrap();

assert!(cache_dir.is_dir());
}
Expand All @@ -200,7 +206,7 @@ mod tests {
async fn missing_file() {
let temp = TempDir::new().unwrap();
let engine = Engine::default();
let key = Key::new([0; 32]);
let key = ModuleHash::new([0; 32]);
let cache = FileSystemCache::new(temp.path());

let err = cache.load(key, &engine).await.unwrap_err();
Expand All @@ -213,9 +219,9 @@ mod tests {
let temp = TempDir::new().unwrap();
let engine = Engine::default();
let module = Module::new(&engine, ADD_WAT).unwrap();
let key = Key::new([0; 32]);
let key = ModuleHash::new([0; 32]);
let cache = FileSystemCache::new(temp.path());
let expected_path = cache.path(key);
let expected_path = cache.path(key, engine.deterministic_id());
std::fs::create_dir_all(expected_path.parent().unwrap()).unwrap();
let serialized = module.serialize().unwrap();
save_compressed(&expected_path, &serialized).unwrap();
Expand Down
5 changes: 1 addition & 4 deletions lib/wasi/src/runtime/module_cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
mod and_then;
mod disabled;
mod filesystem;
mod shared;
mod thread_local;
Expand All @@ -10,11 +9,9 @@ pub use self::{
filesystem::FileSystemCache,
shared::SharedCache,
thread_local::ThreadLocalCache,
types::{CacheError, Key, ModuleCache},
types::{CacheError, ModuleCache, ModuleHash},
};

pub(crate) use self::disabled::Disabled;

/// Get a [`ModuleCache`] which should be good enough for most in-memory use
/// cases.
///
Expand Down
17 changes: 12 additions & 5 deletions lib/wasi/src/runtime/module_cache/shared.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use dashmap::DashMap;
use wasmer::{Engine, Module};

use crate::runtime::module_cache::{CacheError, Key, ModuleCache};
use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};

/// A [`ModuleCache`] based on a <code>[DashMap]<[Key], [Module]></code>.
/// A [`ModuleCache`] based on a <code>[DashMap]<[ModuleHash], [Module]></code>.
#[derive(Debug, Default, Clone)]
pub struct SharedCache {
modules: DashMap<Key, Module>,
modules: DashMap<(ModuleHash, String), Module>,
}

impl SharedCache {
Expand All @@ -17,14 +17,21 @@ impl SharedCache {

#[async_trait::async_trait]
impl ModuleCache for SharedCache {
async fn load(&self, key: Key, _engine: &Engine) -> Result<Module, CacheError> {
async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
let key = (key, engine.deterministic_id().to_string());
self.modules
.get(&key)
.map(|m| m.value().clone())
.ok_or(CacheError::NotFound)
}

async fn save(&self, key: Key, module: &Module) -> Result<(), CacheError> {
async fn save(
&self,
key: ModuleHash,
engine: &Engine,
module: &Module,
) -> Result<(), CacheError> {
let key = (key, engine.deterministic_id().to_string());
self.modules.insert(key, module.clone());

Ok(())
Expand Down
24 changes: 16 additions & 8 deletions lib/wasi/src/runtime/module_cache/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::{cell::RefCell, collections::HashMap};

use wasmer::{Engine, Module};

use crate::runtime::module_cache::{CacheError, Key, ModuleCache};
use crate::runtime::module_cache::{CacheError, ModuleCache, ModuleHash};

std::thread_local! {
static CACHED_MODULES: RefCell<HashMap<Key, Module>>
static CACHED_MODULES: RefCell<HashMap<(ModuleHash, String), Module>>
= RefCell::new(HashMap::new());
}

Expand All @@ -15,23 +15,31 @@ std::thread_local! {
pub struct ThreadLocalCache {}

impl ThreadLocalCache {
fn lookup(&self, key: Key) -> Option<Module> {
fn lookup(&self, key: ModuleHash, deterministic_id: &str) -> Option<Module> {
let key = (key, deterministic_id.to_string());
CACHED_MODULES.with(|m| m.borrow().get(&key).cloned())
}

fn insert(&self, key: Key, module: &Module) {
fn insert(&self, key: ModuleHash, module: &Module, deterministic_id: &str) {
let key = (key, deterministic_id.to_string());
CACHED_MODULES.with(|m| m.borrow_mut().insert(key, module.clone()));
}
}

#[async_trait::async_trait]
impl ModuleCache for ThreadLocalCache {
async fn load(&self, key: Key, _engine: &Engine) -> Result<Module, CacheError> {
self.lookup(key).ok_or(CacheError::NotFound)
async fn load(&self, key: ModuleHash, engine: &Engine) -> Result<Module, CacheError> {
self.lookup(key, engine.deterministic_id())
.ok_or(CacheError::NotFound)
}

async fn save(&self, key: Key, module: &Module) -> Result<(), CacheError> {
self.insert(key, module);
async fn save(
&self,
key: ModuleHash,
engine: &Engine,
module: &Module,
) -> Result<(), CacheError> {
self.insert(key, module, engine.deterministic_id());
Ok(())
}
}
Loading