Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an issue where the package loader was blocking the tokio runtime #4335

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/wasix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,22 @@ pub(crate) fn block_in_place<Ret>(thunk: impl FnOnce() -> Ret) -> Ret {
}
}
}

/// Spawns a new blocking task that runs the provided closure.
///
/// The closure is executed on a separate thread, allowing it to perform blocking operations
/// without blocking the main thread. The closure is wrapped in a `Future` that resolves to the
/// result of the closure's execution.
pub(crate) async fn spawn_blocking<F, R>(f: F) -> Result<R, tokio::task::JoinError>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
cfg_if::cfg_if! {
if #[cfg(target_arch = "wasm32")] {
Ok(block_in_place(f))
} else {
tokio::task::spawn_blocking(f).await
}
}
}
69 changes: 42 additions & 27 deletions lib/wasix/src/runtime/package_loader/builtin_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,12 @@ impl BuiltinPackageLoader {
if dist.webc.scheme() == "file" {
match crate::runtime::resolver::utils::file_path_from_url(&dist.webc) {
Ok(path) => {
// FIXME: This will block the thread
let bytes = std::fs::read(&path)
.with_context(|| format!("Unable to read \"{}\"", path.display()))?;
let bytes = crate::spawn_blocking({
let path = path.clone();
move || std::fs::read(path)
})
.await?
.with_context(|| format!("Unable to read \"{}\"", path.display()))?;
return Ok(bytes.into());
}
Err(e) => {
Expand Down Expand Up @@ -222,7 +225,10 @@ impl PackageLoader for BuiltinPackageLoader {
// in a smart way to keep memory usage down.

if let Some(cache) = &self.cache {
match cache.save_and_load_as_mmapped(&bytes, &summary.dist).await {
match cache
.save_and_load_as_mmapped(bytes.clone(), &summary.dist)
.await
{
Ok(container) => {
tracing::debug!("Cached to disk");
self.in_memory.save(&container, summary.dist.webc_sha256);
Expand All @@ -245,7 +251,7 @@ impl PackageLoader for BuiltinPackageLoader {

// The sad path - looks like we don't have a filesystem cache so we'll
// need to keep the whole thing in memory.
let container = Container::from_bytes(bytes)?;
let container = crate::spawn_blocking(move || Container::from_bytes(bytes)).await??;
// We still want to cache it in memory, of course
self.in_memory.save(&container, summary.dist.webc_sha256);
Ok(container)
Expand All @@ -271,7 +277,11 @@ impl FileSystemCache {
async fn lookup(&self, hash: &WebcHash) -> Result<Option<Container>, Error> {
let path = self.path(hash);

let container = crate::block_in_place(|| Container::from_disk(&path));
let container = crate::spawn_blocking({
let path = path.clone();
move || Container::from_disk(path)
})
.await?;
match container {
Ok(c) => Ok(Some(c)),
Err(ContainerError::Open { error, .. })
Expand All @@ -288,35 +298,40 @@ impl FileSystemCache {
}
}

async fn save(&self, webc: &[u8], dist: &DistributionInfo) -> Result<(), Error> {
async fn save(&self, webc: Bytes, dist: &DistributionInfo) -> Result<(), Error> {
let path = self.path(&dist.webc_sha256);

let parent = path.parent().expect("Always within cache_dir");

std::fs::create_dir_all(parent)
.with_context(|| format!("Unable to create \"{}\"", parent.display()))?;

let mut temp = NamedTempFile::new_in(parent)?;
temp.write_all(webc)?;
temp.flush()?;
temp.as_file_mut().sync_all()?;
temp.persist(&path)?;

tracing::debug!(
pkg.hash=%dist.webc_sha256,
pkg.url=%dist.webc,
path=%path.display(),
num_bytes=webc.len(),
"Saved to disk",
);
let dist = dist.clone();

crate::spawn_blocking(move || {
let parent = path.parent().expect("Always within cache_dir");

std::fs::create_dir_all(parent)
.with_context(|| format!("Unable to create \"{}\"", parent.display()))?;

let mut temp = NamedTempFile::new_in(parent)?;
temp.write_all(&webc)?;
temp.flush()?;
temp.as_file_mut().sync_all()?;
temp.persist(&path)?;

tracing::debug!(
pkg.hash=%dist.webc_sha256,
pkg.url=%dist.webc,
path=%path.display(),
num_bytes=webc.len(),
"Saved to disk",
);
Result::<_, Error>::Ok(())
})
.await??;

Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn save_and_load_as_mmapped(
&self,
webc: &[u8],
webc: Bytes,
dist: &DistributionInfo,
) -> Result<Container, Error> {
// First, save it to disk
Expand Down
Loading