Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use webc's compat layer in more places #3777

Merged
merged 10 commits into from
Apr 20, 2023
124 changes: 25 additions & 99 deletions lib/wasi/src/bin_factory/binary_package.rs
Original file line number Diff line number Diff line change
@@ -1,145 +1,71 @@
use std::{
any::Any,
borrow::Cow,
collections::HashMap,
sync::{Arc, Mutex, RwLock},
};
use std::sync::{Arc, Mutex, RwLock};

use derivative::*;
use virtual_fs::{FileSystem, TmpFileSystem};
use wasmer_wasix_types::wasi::Snapshot0Clockid;
use once_cell::sync::OnceCell;
use virtual_fs::FileSystem;
use webc::compat::SharedBytes;

use super::hash_of_binary;
use crate::syscalls::platform_clock_time_get;

#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct BinaryPackageCommand {
pub name: String,
name: String,
#[derivative(Debug = "ignore")]
pub atom: Cow<'static, [u8]>,
hash: Option<String>,
pub ownership: Option<Arc<dyn Any + Send + Sync + 'static>>,
pub(crate) atom: SharedBytes,
hash: OnceCell<String>,
}

impl BinaryPackageCommand {
pub fn new(name: String, atom: Cow<'static, [u8]>) -> Self {
pub fn new(name: String, atom: SharedBytes) -> Self {
Self {
name,
ownership: None,
hash: None,
atom,
hash: OnceCell::new(),
}
}

/// Hold on to some arbitrary data for the lifetime of this binary pacakge.
///
/// # Safety
///
/// Must ensure that the atom data will be safe to use as long as the provided
/// ownership handle stays alive.
pub unsafe fn new_with_ownership<'a, T>(
name: String,
atom: Cow<'a, [u8]>,
ownership: Arc<T>,
) -> Self
where
T: 'static,
{
let ownership: Arc<dyn Any> = ownership;
let mut ret = Self::new(name, std::mem::transmute(atom));
ret.ownership = Some(std::mem::transmute(ownership));
ret
pub fn name(&self) -> &str {
&self.name
}

pub fn hash(&mut self) -> &str {
if self.hash.is_none() {
self.hash = Some(hash_of_binary(self.atom.as_ref()));
}
let hash = self.hash.as_ref().unwrap();
hash.as_str()
pub fn atom(&self) -> &[u8] {
&self.atom
}

pub fn hash(&self) -> &str {
self.hash.get_or_init(|| hash_of_binary(self.atom()))
}
}

/// A WebAssembly package that has been loaded into memory.
///
/// You can crate a [`BinaryPackage`] using [`crate::bin_factory::ModuleCache`]
/// or [`crate::wapm::parse_static_webc()`].
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct BinaryPackage {
pub package_name: Cow<'static, str>,
pub package_name: String,
pub when_cached: Option<u128>,
pub ownership: Option<Arc<dyn Any + Send + Sync + 'static>>,
#[derivative(Debug = "ignore")]
pub entry: Option<Cow<'static, [u8]>>,
pub entry: Option<SharedBytes>,
pub hash: Arc<Mutex<Option<String>>>,
pub wapm: Option<String>,
pub base_dir: Option<String>,
pub tmp_fs: TmpFileSystem,
pub webc_fs: Option<Arc<dyn FileSystem + Send + Sync + 'static>>,
pub webc_top_level_dirs: Vec<String>,
pub mappings: Vec<String>,
pub envs: HashMap<String, String>,
pub commands: Arc<RwLock<Vec<BinaryPackageCommand>>>,
pub uses: Vec<String>,
pub version: Cow<'static, str>,
pub version: String,
pub module_memory_footprint: u64,
pub file_system_memory_footprint: u64,
}

impl BinaryPackage {
pub fn new(package_name: &str, entry: Option<Cow<'static, [u8]>>) -> Self {
let now = platform_clock_time_get(Snapshot0Clockid::Monotonic, 1_000_000).unwrap() as u128;
let (package_name, version) = match package_name.split_once('@') {
Some((a, b)) => (a.to_string(), b.to_string()),
None => (package_name.to_string(), "1.0.0".to_string()),
};
let module_memory_footprint = entry.as_ref().map(|a| a.len()).unwrap_or_default() as u64;
Self {
package_name: package_name.into(),
when_cached: Some(now),
ownership: None,
entry,
hash: Arc::new(Mutex::new(None)),
wapm: None,
base_dir: None,
tmp_fs: TmpFileSystem::new(),
webc_fs: None,
webc_top_level_dirs: Default::default(),
mappings: Vec::new(),
envs: HashMap::default(),
commands: Arc::new(RwLock::new(Vec::new())),
uses: Vec::new(),
version: version.into(),
module_memory_footprint,
file_system_memory_footprint: 0,
}
}

/// Hold on to some arbitrary data for the lifetime of this binary pacakge.
///
/// # Safety
///
/// Must ensure that the entry data will be safe to use as long as the provided
/// ownership handle stays alive.
pub unsafe fn new_with_ownership<'a, T>(
package_name: &str,
entry: Option<Cow<'a, [u8]>>,
ownership: Arc<T>,
) -> Self
where
T: 'static,
{
let ownership: Arc<dyn Any> = ownership;
let mut ret = Self::new(package_name, entry.map(|a| std::mem::transmute(a)));
ret.ownership = Some(std::mem::transmute(ownership));
ret
}

pub fn hash(&self) -> String {
let mut hash = self.hash.lock().unwrap();
if hash.is_none() {
if let Some(entry) = self.entry.as_ref() {
hash.replace(hash_of_binary(entry.as_ref()));
} else {
hash.replace(hash_of_binary(self.package_name.as_ref()));
hash.replace(hash_of_binary(&self.package_name));
}
}
hash.as_ref().unwrap().clone()
Expand Down
39 changes: 30 additions & 9 deletions lib/wasi/src/bin_factory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{
collections::HashMap,
ops::Deref,
path::Path,
sync::{Arc, RwLock},
};

use anyhow::Context;
use virtual_fs::{AsyncReadExt, FileSystem};

mod binary_package;
Expand Down Expand Up @@ -78,15 +80,17 @@ impl BinFactory {
// Check the filesystem for the file
if name.starts_with('/') {
if let Some(fs) = fs {
if let Ok(mut file) = fs.new_open_options().read(true).open(name.clone()) {
// Read the file
let mut data = Vec::with_capacity(file.size() as usize);
// TODO: log error?
if file.read_to_end(&mut data).await.is_ok() {
let package_name = name.split('/').last().unwrap_or(name.as_str());
let data = BinaryPackage::new(package_name, Some(data.into()));
cache.insert(name, Some(data.clone()));
return Some(data);
match load_package_from_disk(fs, name.as_ref()).await {
Ok(pkg) => {
cache.insert(name, Some(pkg.clone()));
return Some(pkg);
}
Err(e) => {
tracing::warn!(
path = name,
error = &*e,
"Unable to load the package from disk"
);
}
}
}
Expand All @@ -98,6 +102,23 @@ impl BinFactory {
}
}

async fn load_package_from_disk(
fs: &dyn FileSystem,
path: &Path,
) -> Result<BinaryPackage, anyhow::Error> {
let mut f = fs
.new_open_options()
.read(true)
.open(path)
.context("Unable to open the file")?;

let mut data = Vec::with_capacity(f.size() as usize);
f.read_to_end(&mut data).await.context("Read failed")?;
let pkg = crate::wapm::parse_static_webc(data).context("Unable to parse the package")?;

Ok(pkg)
}

pub fn hash_of_binary(data: impl AsRef<[u8]>) -> String {
let mut hasher = Sha256::default();
hasher.update(data.as_ref());
Expand Down
54 changes: 23 additions & 31 deletions lib/wasi/src/bin_factory/module_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ pub const DEFAULT_CACHE_TIME: std::time::Duration = std::time::Duration::from_se

#[derive(Debug)]
pub struct ModuleCache {
pub(crate) cache_compile_dir: String,
pub(crate) cache_compile_dir: PathBuf,
pub(crate) cached_modules: Option<RwLock<HashMap<String, Module>>>,

pub(crate) cache_webc: RwLock<HashMap<String, BinaryPackage>>,
pub(crate) cache_webc_dir: String,
pub(crate) cache_webc_dir: PathBuf,

pub(crate) cache_time: std::time::Duration,
}
Expand Down Expand Up @@ -51,21 +51,18 @@ impl ModuleCache {
///
/// use_shared_cache enables a shared cache of modules in addition to a thread-local cache.
pub fn new(
cache_compile_dir: Option<String>,
cache_webc_dir: Option<String>,
cache_compile_dir: Option<PathBuf>,
cache_webc_dir: Option<PathBuf>,
use_shared_cache: bool,
) -> ModuleCache {
let cache_compile_dir = shellexpand::tilde(
cache_compile_dir
.as_deref()
.unwrap_or(DEFAULT_COMPILED_PATH),
)
.to_string();
let _ = std::fs::create_dir_all(PathBuf::from(cache_compile_dir.clone()));

let cache_webc_dir =
shellexpand::tilde(cache_webc_dir.as_deref().unwrap_or(DEFAULT_WEBC_PATH)).to_string();
let _ = std::fs::create_dir_all(PathBuf::from(cache_webc_dir.clone()));
let cache_compile_dir = cache_compile_dir.unwrap_or_else(|| {
PathBuf::from(shellexpand::tilde(DEFAULT_COMPILED_PATH).into_owned())
});
let _ = std::fs::create_dir_all(&cache_compile_dir);

let cache_webc_dir = cache_webc_dir
.unwrap_or_else(|| PathBuf::from(shellexpand::tilde(DEFAULT_WEBC_PATH).into_owned()));
let _ = std::fs::create_dir_all(&cache_webc_dir);

let cached_modules = if use_shared_cache {
Some(RwLock::new(HashMap::default()))
Expand Down Expand Up @@ -145,8 +142,9 @@ impl ModuleCache {
.split_once(':')
.map(|a| a.0)
.unwrap_or_else(|| name.as_str());
let cache_webc_dir = self.cache_webc_dir.as_str();
if let Ok(mut data) = crate::wapm::fetch_webc_task(cache_webc_dir, wapm_name, runtime) {
if let Ok(mut data) =
crate::wapm::fetch_webc_task(&self.cache_webc_dir, wapm_name, runtime)
{
// If the binary has no entry but it inherits from another module
// that does have an entry then we fall back to that inherited entry point
// (this convention is recursive down the list of inheritance until it finds the first entry point)
Expand Down Expand Up @@ -221,8 +219,7 @@ impl ModuleCache {
}

// slow path
let path = std::path::Path::new(self.cache_compile_dir.as_str())
.join(format!("{}.bin", key).as_str());
let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str());
if let Ok(data) = std::fs::read(path.as_path()) {
tracing::trace!("bin file found: {:?} [len={}]", path.as_path(), data.len());
let mut decoder = weezl::decode::Decoder::new(weezl::BitOrder::Msb, 8);
Expand Down Expand Up @@ -277,8 +274,7 @@ impl ModuleCache {
// We should also attempt to store it in the cache directory
let compiled_bytes = module.serialize().unwrap();

let path = std::path::Path::new(self.cache_compile_dir.as_str())
.join(format!("{}.bin", key).as_str());
let path = self.cache_compile_dir.join(format!("{}.bin", key).as_str());
// TODO: forward error!
let _ = std::fs::create_dir_all(path.parent().unwrap());
let mut encoder = weezl::encode::Encoder::new(weezl::BitOrder::Msb, 8);
Expand All @@ -293,23 +289,19 @@ impl ModuleCache {
mod tests {
use std::{sync::Arc, time::Duration};

use tracing_subscriber::{
filter, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, Layer,
};
use tracing_subscriber::filter::LevelFilter;

use crate::{runtime::task_manager::tokio::TokioTaskManager, PluggableRuntime};

use super::*;

#[test]
fn test_module_cache() {
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.pretty()
.with_filter(filter::LevelFilter::INFO),
)
.init();
let _ = tracing_subscriber::fmt()
.pretty()
.with_test_writer()
.with_max_level(LevelFilter::INFO)
.try_init();

let mut cache = ModuleCache::new(None, None, true);
cache.cache_time = std::time::Duration::from_millis(500);
Expand Down
Loading