Skip to content

Commit

Permalink
Merge pull request #4335 from wasmerio/fix-for-blocking-runtime
Browse files Browse the repository at this point in the history
Fixed an issue where the package loader was blocking the tokio runtime
  • Loading branch information
theduke authored Mar 12, 2024
2 parents 7d2226f + 4ca51f9 commit 04ad072
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 27 deletions.
19 changes: 19 additions & 0 deletions lib/wasix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,22 @@ pub(crate) fn block_in_place<Ret>(thunk: impl FnOnce() -> Ret) -> Ret {
}
}
}

/// Spawns a new blocking task that runs the provided closure.
///
/// The closure is executed on a separate thread, allowing it to perform blocking operations
/// without blocking the main thread. The closure is wrapped in a `Future` that resolves to the
/// result of the closure's execution.
pub(crate) async fn spawn_blocking<F, R>(f: F) -> Result<R, tokio::task::JoinError>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
cfg_if::cfg_if! {
if #[cfg(target_arch = "wasm32")] {
Ok(block_in_place(f))
} else {
tokio::task::spawn_blocking(f).await
}
}
}
69 changes: 42 additions & 27 deletions lib/wasix/src/runtime/package_loader/builtin_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ impl BuiltinPackageLoader {
if dist.webc.scheme() == "file" {
match crate::runtime::resolver::utils::file_path_from_url(&dist.webc) {
Ok(path) => {
// FIXME: This will block the thread
let bytes = std::fs::read(&path)
.with_context(|| format!("Unable to read \"{}\"", path.display()))?;
let bytes = crate::spawn_blocking({
let path = path.clone();
move || std::fs::read(path)
})
.await?
.with_context(|| format!("Unable to read \"{}\"", path.display()))?;
return Ok(bytes.into());
}
Err(e) => {
Expand Down Expand Up @@ -222,7 +225,10 @@ impl PackageLoader for BuiltinPackageLoader {
// in a smart way to keep memory usage down.

if let Some(cache) = &self.cache {
match cache.save_and_load_as_mmapped(&bytes, &summary.dist).await {
match cache
.save_and_load_as_mmapped(bytes.clone(), &summary.dist)
.await
{
Ok(container) => {
tracing::debug!("Cached to disk");
self.in_memory.save(&container, summary.dist.webc_sha256);
Expand All @@ -245,7 +251,7 @@ impl PackageLoader for BuiltinPackageLoader {

// The sad path - looks like we don't have a filesystem cache so we'll
// need to keep the whole thing in memory.
let container = Container::from_bytes(bytes)?;
let container = crate::spawn_blocking(move || Container::from_bytes(bytes)).await??;
// We still want to cache it in memory, of course
self.in_memory.save(&container, summary.dist.webc_sha256);
Ok(container)
Expand All @@ -271,7 +277,11 @@ impl FileSystemCache {
async fn lookup(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
let path = self.path(hash);

let container = crate::block_in_place(|| Container::from_disk(&path));
let container = crate::spawn_blocking({
let path = path.clone();
move || Container::from_disk(path)
})
.await?;
match container {
Ok(c) => Ok(Some(c)),
Err(ContainerError::Open { error, .. })
Expand All @@ -288,35 +298,40 @@ impl FileSystemCache {
}
}

async fn save(&self, webc: &[u8], dist: &DistributionInfo) -> Result<(), Error> {
async fn save(&self, webc: Bytes, dist: &DistributionInfo) -> Result<(), Error> {
let path = self.path(&dist.webc_sha256);

let parent = path.parent().expect("Always within cache_dir");

std::fs::create_dir_all(parent)
.with_context(|| format!("Unable to create \"{}\"", parent.display()))?;

let mut temp = NamedTempFile::new_in(parent)?;
temp.write_all(webc)?;
temp.flush()?;
temp.as_file_mut().sync_all()?;
temp.persist(&path)?;

tracing::debug!(
pkg.hash=%dist.webc_sha256,
pkg.url=%dist.webc,
path=%path.display(),
num_bytes=webc.len(),
"Saved to disk",
);
let dist = dist.clone();

crate::spawn_blocking(move || {
let parent = path.parent().expect("Always within cache_dir");

std::fs::create_dir_all(parent)
.with_context(|| format!("Unable to create \"{}\"", parent.display()))?;

let mut temp = NamedTempFile::new_in(parent)?;
temp.write_all(&webc)?;
temp.flush()?;
temp.as_file_mut().sync_all()?;
temp.persist(&path)?;

tracing::debug!(
pkg.hash=%dist.webc_sha256,
pkg.url=%dist.webc,
path=%path.display(),
num_bytes=webc.len(),
"Saved to disk",
);
Result::<_, Error>::Ok(())
})
.await??;

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn save_and_load_as_mmapped(
&self,
webc: &[u8],
webc: Bytes,
dist: &DistributionInfo,
) -> Result<Container, Error> {
// First, save it to disk
Expand Down

0 comments on commit 04ad072

Please sign in to comment.