Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::str::FromStr;
use std::sync::Arc;

use rustc_hash::FxHashMap;
use tracing::debug;
use tracing::{debug, warn};

pub use archive::ArchiveId;
use uv_cache_info::Timestamp;
use uv_fs::{LockedFile, cachedir, directories};
use uv_fs::{LockedFile, Simplified, cachedir, directories};
use uv_normalize::PackageName;
use uv_pypi_types::ResolutionMetadata;

Expand All @@ -22,6 +21,7 @@ use crate::removal::Remover;
pub use crate::removal::{Removal, rm_rf};
pub use crate::wheel::WheelCache;
use crate::wheel::WheelCacheKind;
pub use archive::ArchiveId;

mod archive;
mod by_timestamp;
Expand Down Expand Up @@ -135,6 +135,8 @@ impl Deref for CacheShard {
}

/// The main cache abstraction.
///
/// While the cache is active, it holds a read (shared) lock that prevents cache cleaning
#[derive(Debug, Clone)]
pub struct Cache {
/// The cache directory.
Expand All @@ -146,6 +148,9 @@ pub struct Cache {
/// Included to ensure that the temporary directory exists for the length of the operation, but
/// is dropped at the end as appropriate.
temp_dir: Option<Arc<tempfile::TempDir>>,
/// Ensure that `uv cache` operations don't remove items from the cache that are used by another
/// uv process.
lock_file: Option<Arc<LockedFile>>,
}

impl Cache {
Expand All @@ -155,6 +160,7 @@ impl Cache {
root: root.into(),
refresh: Refresh::None(Timestamp::now()),
temp_dir: None,
lock_file: None,
}
}

Expand All @@ -165,6 +171,7 @@ impl Cache {
root: temp_dir.path().to_path_buf(),
refresh: Refresh::None(Timestamp::now()),
temp_dir: Some(Arc::new(temp_dir)),
lock_file: None,
})
}

Expand All @@ -174,6 +181,34 @@ impl Cache {
Self { refresh, ..self }
}

/// Acquire a lock that allows removing entries from the cache.
pub fn with_exclusive_lock(self) -> Result<Self, io::Error> {
let Self {
root,
refresh,
temp_dir,
lock_file,
} = self;

// Release the existing lock, avoid deadlocks from a cloned cache.
if let Some(lock_file) = lock_file {
drop(
Arc::try_unwrap(lock_file).expect(
"cloning the cache before acquiring an exclusive lock causes a deadlock",
),
);
}
let lock_file =
LockedFile::acquire_blocking(root.join(".lock"), root.simplified_display())?;

Ok(Self {
root,
refresh,
temp_dir,
lock_file: Some(Arc::new(lock_file)),
})
}

/// Return the root of the cache.
pub fn root(&self) -> &Path {
&self.root
Expand Down Expand Up @@ -359,15 +394,43 @@ impl Cache {
.join(".git"),
)?;

// Block cache removal operations from interfering.
let lock_file = match LockedFile::acquire_shared_blocking(
root.join(".lock"),
root.simplified_display(),
) {
Ok(lock_file) => Some(Arc::new(lock_file)),
Err(err) if err.kind() == io::ErrorKind::Unsupported => {
warn!(
"Shared locking is not supported by the current platform or filesystem, \
reduced parallel process safety with `uv cache clean` and `uv cache prune`."
);
None
}
Err(err) => return Err(err),
};

Ok(Self {
root: std::path::absolute(root)?,
lock_file,
..self
})
}

/// Clear the cache, removing all entries.
pub fn clear(&self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> {
Remover::new(reporter).rm_rf(&self.root)
pub fn clear(self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> {
// Remove everything but `.lock`, for Windows locked file special cases.
let mut removal = Remover::new(reporter).rm_rf(&self.root, true)?;
let Self {
root, lock_file, ..
} = self;
// Unlock `.lock`
drop(lock_file);
fs_err::remove_file(root.join(".lock"))?;
removal.num_files += 1;
fs_err::remove_dir(root)?;
removal.num_dirs += 1;
Ok(removal)
}

/// Remove a package from the cache.
Expand Down Expand Up @@ -407,6 +470,7 @@ impl Cache {
if entry.file_name() == "CACHEDIR.TAG"
|| entry.file_name() == ".gitignore"
|| entry.file_name() == ".git"
|| entry.file_name() == ".lock"
{
continue;
}
Expand Down
36 changes: 31 additions & 5 deletions crates/uv-cache/src/removal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::CleanReporter;
/// Remove a file or directory and all its contents, returning a [`Removal`] with
/// the number of files and directories removed, along with a total byte count.
pub fn rm_rf(path: impl AsRef<Path>) -> io::Result<Removal> {
Remover::default().rm_rf(path)
Remover::default().rm_rf(path, false)
}

/// A builder for a [`Remover`] that can remove files and directories.
Expand All @@ -29,9 +29,13 @@ impl Remover {

/// Remove a file or directory and all its contents, returning a [`Removal`] with
/// the number of files and directories removed, along with a total byte count.
pub(crate) fn rm_rf(&self, path: impl AsRef<Path>) -> io::Result<Removal> {
pub(crate) fn rm_rf(
&self,
path: impl AsRef<Path>,
skip_locked_file: bool,
) -> io::Result<Removal> {
let mut removal = Removal::default();
removal.rm_rf(path.as_ref(), self.reporter.as_deref())?;
removal.rm_rf(path.as_ref(), self.reporter.as_deref(), skip_locked_file)?;
Ok(removal)
}
}
Expand All @@ -52,7 +56,12 @@ pub struct Removal {

impl Removal {
/// Recursively remove a file or directory and all its contents.
fn rm_rf(&mut self, path: &Path, reporter: Option<&dyn CleanReporter>) -> io::Result<()> {
fn rm_rf(
&mut self,
path: &Path,
reporter: Option<&dyn CleanReporter>,
skip_locked_file: bool,
) -> io::Result<()> {
let metadata = match fs_err::symlink_metadata(path) {
Ok(metadata) => metadata,
Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()),
Expand Down Expand Up @@ -100,13 +109,25 @@ impl Removal {
if set_readable(dir).unwrap_or(false) {
// Retry the operation; if we _just_ `self.rm_rf(dir)` and continue,
// `walkdir` may give us duplicate entries for the directory.
return self.rm_rf(path, reporter);
return self.rm_rf(path, reporter, skip_locked_file);
}
}
}
}

let entry = entry?;

// Remove the exclusive lock last.
if skip_locked_file
&& entry.file_name() == ".lock"
&& entry
.path()
.strip_prefix(path)
.is_ok_and(|suffix| suffix == Path::new(".lock"))
{
continue;
}

if entry.file_type().is_symlink() && {
#[cfg(windows)]
{
Expand All @@ -121,6 +142,11 @@ impl Removal {
self.num_files += 1;
remove_dir(entry.path())?;
} else if entry.file_type().is_dir() {
// Remove the directory with the exclusive lock last.
if skip_locked_file && entry.path() == path {
continue;
}

self.num_dirs += 1;

// The contents should have been removed by now, but sometimes a race condition is
Expand Down
72 changes: 69 additions & 3 deletions crates/uv-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,50 @@ impl LockedFile {
}
}

/// The same as [`LockedFile::acquire`], but for synchronous contexts. Do not use from an async
/// context, as this can block the runtime while waiting for another process to release the
/// lock.
/// Inner implementation for [`LockedFile::acquire_shared_blocking`] and
/// [`LockedFile::acquire_blocking`].
fn lock_file_shared_blocking(
file: fs_err::File,
resource: &str,
) -> Result<Self, std::io::Error> {
trace!(
"Checking shared lock for `{resource}` at `{}`",
file.path().user_display()
);
// TODO(konsti): Update fs_err to support this.
match FileExt::try_lock_shared(file.file()) {
Ok(()) => {
debug!("Acquired shared lock for `{resource}`");
Ok(Self(file))
}
Err(err) => {
// Log error code and enum kind to help debugging more exotic failures.
if err.kind() != std::io::ErrorKind::WouldBlock {
debug!("Try lock error: {err:?}");
}
info!(
"Waiting to acquire shared lock for `{resource}` at `{}`",
file.path().user_display(),
);
FileExt::lock_shared(file.file()).map_err(|err| {
// Not an fs_err method, we need to build our own path context
std::io::Error::other(format!(
"Could not acquire shared lock for `{resource}` at `{}`: {}",
file.path().user_display(),
err
))
})?;

debug!("Acquired shared lock for `{resource}`");
Ok(Self(file))
}
}
}

/// The same as [`LockedFile::acquire`], but for synchronous contexts.
///
/// Do not use from an async context, as this can block the runtime while waiting for another
/// process to release the lock.
pub fn acquire_blocking(
path: impl AsRef<Path>,
resource: impl Display,
Expand All @@ -705,6 +746,19 @@ impl LockedFile {
Self::lock_file_blocking(file, &resource)
}

/// The same as [`LockedFile::acquire_blocking`], but for synchronous contexts.
///
/// Do not use from an async context, as this can block the runtime while waiting for another
/// process to release the lock.
pub fn acquire_shared_blocking(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
Self::lock_file_shared_blocking(file, &resource)
}

/// Acquire a cross-process lock for a resource using a file at the provided path.
#[cfg(feature = "tokio")]
pub async fn acquire(
Expand All @@ -716,6 +770,18 @@ impl LockedFile {
tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await?
}

/// Acquire a cross-process read lock for a shared resource using a file at the provided path.
#[cfg(feature = "tokio")]
pub async fn acquire_shared(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
tokio::task::spawn_blocking(move || Self::lock_file_shared_blocking(file, &resource))
.await?
}

#[cfg(unix)]
fn create(path: impl AsRef<Path>) -> Result<fs_err::File, std::io::Error> {
use std::os::unix::fs::PermissionsExt;
Expand Down
6 changes: 4 additions & 2 deletions crates/uv/src/commands/cache_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::printer::Printer;
/// Clear the cache, removing all entries or those linked to specific packages.
pub(crate) fn cache_clean(
packages: &[PackageName],
cache: &Cache,
cache: Cache,
printer: Printer,
) -> Result<ExitStatus> {
if !cache.root().exists() {
Expand All @@ -25,6 +25,7 @@ pub(crate) fn cache_clean(
)?;
return Ok(ExitStatus::Success);
}
let cache = cache.with_exclusive_lock()?;

let summary = if packages.is_empty() {
writeln!(
Expand All @@ -36,9 +37,10 @@ pub(crate) fn cache_clean(
let num_paths = walkdir::WalkDir::new(cache.root()).into_iter().count();
let reporter = CleaningDirectoryReporter::new(printer, num_paths);

let root = cache.root().to_path_buf();
cache
.clear(Box::new(reporter))
.with_context(|| format!("Failed to clear cache at: {}", cache.root().user_display()))?
.with_context(|| format!("Failed to clear cache at: {}", root.user_display()))?
} else {
let reporter = CleaningPackageReporter::new(printer, packages.len());
let mut summary = Removal::default();
Expand Down
5 changes: 3 additions & 2 deletions crates/uv/src/commands/cache_prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::commands::{ExitStatus, human_readable_bytes};
use crate::printer::Printer;

/// Prune all unreachable objects from the cache.
pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<ExitStatus> {
pub(crate) fn cache_prune(ci: bool, cache: Cache, printer: Printer) -> Result<ExitStatus> {
if !cache.root().exists() {
writeln!(
printer.stderr(),
Expand All @@ -19,6 +19,7 @@ pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<E
)?;
return Ok(ExitStatus::Success);
}
let cache = cache.with_exclusive_lock()?;

writeln!(
printer.stderr(),
Expand All @@ -29,7 +30,7 @@ pub(crate) fn cache_prune(ci: bool, cache: &Cache, printer: Printer) -> Result<E
let mut summary = Removal::default();

// Prune the source distribution cache, which is tightly coupled to the builder crate.
summary += uv_distribution::prune(cache)
summary += uv_distribution::prune(&cache)
.with_context(|| format!("Failed to prune cache at: {}", cache.root().user_display()))?;

// Prune the remaining cache buckets.
Expand Down
Loading
Loading