Skip to content

Commit

Permalink
use spawn_blocking in storage store_all & store_all_in_archive
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Oct 13, 2023
1 parent ad5bf58 commit 7ed1886
Showing 1 changed file with 96 additions and 72 deletions.
168 changes: 96 additions & 72 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,47 +367,64 @@ impl AsyncStorage {
archive_path: &str,
root_dir: &Path,
) -> Result<(HashMap<PathBuf, String>, CompressionAlgorithm)> {
let mut file_paths = HashMap::new();

// We are only using the `zip` library to create the archives and the matching
// index-file. The ZIP format allows more compression formats, and these can even be mixed
// in a single archive.
//
// Decompression happens by fetching only the part of the remote archive that contains
// the compressed stream of the object we put into the archive.
// For decompression we are sharing the compression algorithms defined in
// `storage::compression`. So every new algorithm to be used inside ZIP archives
// also has to be added as supported algorithm for storage compression, together
// with a mapping in `storage::archive_index::Index::new_from_zip`.

let options =
zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Bzip2);

let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new()));
for file_path in get_file_list(root_dir)? {
let mut file = fs::File::open(root_dir.join(&file_path))?;

zip.start_file(file_path.to_str().unwrap(), options)?;
io::copy(&mut file, &mut zip)?;

let mime = detect_mime(&file_path);
file_paths.insert(file_path, mime.to_string());
}

let mut zip_content = zip.finish()?.into_inner();

let remote_index_path = format!("{}.index", &archive_path);

let local_index_path = self
.config
.local_archive_cache_path
.join(&remote_index_path);
fs::create_dir_all(local_index_path.parent().unwrap())?;
archive_index::create(&mut io::Cursor::new(&mut zip_content), &local_index_path)?;
let (zip_content, compressed_index_content, alg, remote_index_path, file_paths) =
spawn_blocking({
let archive_path = archive_path.to_owned();
let root_dir = root_dir.to_owned();
let local_archive_cache_path = self.config.local_archive_cache_path.clone();

move || {
let mut file_paths = HashMap::new();

// We are only using the `zip` library to create the archives and the matching
// index-file. The ZIP format allows more compression formats, and these can even be mixed
// in a single archive.
//
// Decompression happens by fetching only the part of the remote archive that contains
// the compressed stream of the object we put into the archive.
// For decompression we are sharing the compression algorithms defined in
// `storage::compression`. So every new algorithm to be used inside ZIP archives
// also has to be added as supported algorithm for storage compression, together
// with a mapping in `storage::archive_index::Index::new_from_zip`.

let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Bzip2);

let mut zip = zip::ZipWriter::new(io::Cursor::new(Vec::new()));
for file_path in get_file_list(&root_dir)? {
let mut file = fs::File::open(&root_dir.join(&file_path))?;

zip.start_file(file_path.to_str().unwrap(), options)?;
io::copy(&mut file, &mut zip)?;

let mime = detect_mime(&file_path);
file_paths.insert(file_path, mime.to_string());
}

let alg = CompressionAlgorithm::default();
let compressed_index_content =
compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?;
let mut zip_content = zip.finish()?.into_inner();

let remote_index_path = format!("{}.index", &archive_path);

let local_index_path = local_archive_cache_path.join(&remote_index_path);
fs::create_dir_all(local_index_path.parent().unwrap())?;
archive_index::create(
&mut io::Cursor::new(&mut zip_content),
&local_index_path,
)?;

let alg = CompressionAlgorithm::default();
let compressed_index_content =
compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?;
Ok((
zip_content,
compressed_index_content,
alg,
remote_index_path,
file_paths,
))
}
})
.await?;

self.store_inner(vec![
Blob {
Expand Down Expand Up @@ -439,38 +456,45 @@ impl AsyncStorage {
prefix: &Path,
root_dir: &Path,
) -> Result<(HashMap<PathBuf, String>, HashSet<CompressionAlgorithm>)> {
let mut file_paths_and_mimes = HashMap::new();
let mut algs = HashSet::with_capacity(1);

let blobs: Vec<_> = get_file_list(root_dir)?
.into_iter()
.filter_map(|file_path| {
// Some files have insufficient permissions
// (like .lock file created by cargo in documentation directory).
// Skip these files.
fs::File::open(root_dir.join(&file_path))
.ok()
.map(|file| (file_path, file))
})
.map(|(file_path, file)| -> Result<_> {
let alg = CompressionAlgorithm::default();
let content = compress(file, alg)?;
let bucket_path = prefix.join(&file_path).to_slash().unwrap().to_string();

let mime = detect_mime(&file_path);
file_paths_and_mimes.insert(file_path, mime.to_string());
algs.insert(alg);

Ok(Blob {
path: bucket_path,
mime: mime.to_string(),
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
})
})
.collect::<Result<Vec<_>>>()?;
let (blobs, file_paths_and_mimes, algs) = spawn_blocking({
let prefix = prefix.to_owned();
let root_dir = root_dir.to_owned();
move || {
let mut file_paths_and_mimes = HashMap::new();
let mut algs = HashSet::with_capacity(1);
let blobs: Vec<_> = get_file_list(&root_dir)?
.into_iter()
.filter_map(|file_path| {
// Some files have insufficient permissions
// (like .lock file created by cargo in documentation directory).
// Skip these files.
fs::File::open(root_dir.join(&file_path))
.ok()
.map(|file| (file_path, file))
})
.map(|(file_path, file)| -> Result<_> {
let alg = CompressionAlgorithm::default();
let content = compress(file, alg)?;
let bucket_path = prefix.join(&file_path).to_slash().unwrap().to_string();

let mime = detect_mime(&file_path);
file_paths_and_mimes.insert(file_path, mime.to_string());
algs.insert(alg);

Ok(Blob {
path: bucket_path,
mime: mime.to_string(),
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok((blobs, file_paths_and_mimes, algs))
}
})
.await?;

self.store_inner(blobs).await?;
Ok((file_paths_and_mimes, algs))
Expand Down

0 comments on commit 7ed1886

Please sign in to comment.