diff --git a/crates/uv-bench/benches/uv.rs b/crates/uv-bench/benches/uv.rs index d6ac82c3d9c3e..3ebf301c7206f 100644 --- a/crates/uv-bench/benches/uv.rs +++ b/crates/uv-bench/benches/uv.rs @@ -256,6 +256,7 @@ mod resolver { client, &build_context, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), )?; diff --git a/crates/uv-configuration/src/concurrency.rs b/crates/uv-configuration/src/concurrency.rs index ae468120d1392..975b085378ccd 100644 --- a/crates/uv-configuration/src/concurrency.rs +++ b/crates/uv-configuration/src/concurrency.rs @@ -24,6 +24,8 @@ pub struct Concurrency { pub downloads_semaphore: Arc, /// A global semaphore to limit the number of concurrent builds. pub builds_semaphore: Arc, + /// A global semaphore to limit the number of concurrent source distribution preparations. + pub source_distribution_semaphore: Arc, } /// Custom `Debug` to hide semaphore fields from `--show-settings` output. @@ -56,6 +58,7 @@ impl Concurrency { installs, downloads_semaphore: Arc::new(Semaphore::new(downloads)), builds_semaphore: Arc::new(Semaphore::new(builds)), + source_distribution_semaphore: Arc::new(Semaphore::new(builds)), } } diff --git a/crates/uv-dispatch/src/lib.rs b/crates/uv-dispatch/src/lib.rs index 94bc9ad055cb9..05b541e396040 100644 --- a/crates/uv-dispatch/src/lib.rs +++ b/crates/uv-dispatch/src/lib.rs @@ -268,6 +268,7 @@ impl BuildContext for BuildDispatch<'_> { self.client, self, self.concurrency.downloads_semaphore.clone(), + self.concurrency.source_distribution_semaphore.clone(), ) .with_build_stack(build_stack), )?; @@ -361,6 +362,7 @@ impl BuildContext for BuildDispatch<'_> { self.client, self, self.concurrency.downloads_semaphore.clone(), + self.concurrency.source_distribution_semaphore.clone(), ) .with_build_stack(build_stack), ); diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index 528a2cc78502f..b93d2399d2f4a 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -63,10 +63,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { client: &'a RegistryClient, build_context: &'a Context, downloads_semaphore: Arc, + source_distribution_semaphore: Arc, ) -> Self { Self { build_context, - builder: SourceDistributionBuilder::new(build_context), + builder: SourceDistributionBuilder::new(build_context, source_distribution_semaphore), client: ManagedClient::new(client, downloads_semaphore), reporter: None, } diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 17c07fceae074..f2f3b4dbf2967 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -14,6 +14,8 @@ use std::path::Path; use std::str::FromStr; use std::sync::Arc; +use tokio::sync::{Semaphore, SemaphorePermit}; + use fs_err::tokio as fs; use futures::{FutureExt, TryStreamExt}; use reqwest::{Response, StatusCode}; @@ -63,6 +65,9 @@ pub(crate) struct SourceDistributionBuilder<'a, T: BuildContext> { build_context: &'a T, build_stack: Option<&'a BuildStack>, reporter: Option>, + /// Limits concurrent source distribution preparation work. + /// This bounds tasks that may hold shard locks and open many file descriptors. + concurrency_limit: Arc, } /// The name of the file that contains the revision ID for a remote distribution, encoded via `MsgPack`. @@ -79,11 +84,12 @@ pub(crate) const SOURCE: &str = "src"; impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { /// Initialize a [`SourceDistributionBuilder`] from a [`BuildContext`]. - pub(crate) fn new(build_context: &'a T) -> Self { + pub(crate) fn new(build_context: &'a T, concurrency_limit: Arc) -> Self { Self { build_context, build_stack: None, reporter: None, + concurrency_limit, } } @@ -105,6 +111,69 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } } + /// Acquire a permit from the concurrency limiter before acquiring a cache lock. + async fn acquire_concurrency_permit(&self) -> SemaphorePermit<'_> { + self.concurrency_limit + .acquire() + .await + .expect("concurrency semaphore should not be closed") + } + + /// Read cached metadata if it exists and matches the expected source identity. + async fn read_matching_cached_metadata( + &self, + source: &BuildableSource<'_>, + metadata_entry: &CacheEntry, + ) -> Option { + match CachedMetadata::read(metadata_entry).await { + Ok(Some(metadata)) => { + if metadata.matches(source.name(), source.version()) { + debug!("Using cached metadata for: {source}"); + Some(metadata) + } else { + debug!( + "Cached metadata does not match expected name and version for: {source}" + ); + None + } + } + Ok(None) => None, + Err(err) => { + debug!("Failed to deserialize cached metadata for: {source} ({err})"); + None + } + } + } + + async fn commit_canonical_local_revision_pointer( + &self, + lock_shard: &CacheShard, + pointer: LocalRevisionPointer, + hashes: HashPolicy<'_>, + ) -> Result { + let entry = lock_shard.entry(LOCAL_REVISION); + if let Some(canonical) = LocalRevisionPointer::read_from(&entry)? { + if *canonical.cache_info() == *pointer.cache_info() + && canonical.revision().has_digests(hashes) + { + return Ok(canonical); + } + } + + pointer.write_to(&entry).await?; + Ok(pointer) + } + + fn read_canonical_http_revision( + lock_shard: &CacheShard, + hashes: HashPolicy<'_>, + ) -> Result, Error> { + let entry = lock_shard.entry(HTTP_REVISION); + Ok(HttpRevisionPointer::read_from(&entry)? + .map(HttpRevisionPointer::into_revision) + .filter(|revision| revision.has_digests(hashes))) + } + /// Download and build a [`SourceDist`]. pub(crate) async fn download_and_build( &self, @@ -457,8 +526,6 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { hashes: HashPolicy<'_>, client: &ManagedClient<'_>, ) -> Result { - let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; - // Fetch the revision for the source distribution. let revision = self .url_revision(source, ext, url, index, cache_shard, hashes, client) @@ -475,7 +542,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. - let cache_shard = cache_shard.shard(revision.id()); + let lock_shard = cache_shard; + let cache_shard = lock_shard.shard(revision.id()); let source_dist_entry = cache_shard.entry(SOURCE); // We don't track any cache information for URL-based source distributions; they're assumed @@ -534,6 +602,54 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } } + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; + let _lock = lock_shard.lock().await.map_err(Error::CacheLock)?; + let revision = Self::read_canonical_http_revision(lock_shard, hashes)?.unwrap_or(revision); + let cache_shard = lock_shard.shard(revision.id()); + let source_dist_entry = cache_shard.entry(SOURCE); + let revision = if source_dist_entry.path().is_dir() { + revision + } else { + self.heal_url_revision( + source, + ext, + url, + index, + &source_dist_entry, + revision, + hashes, + client, + ) + .await? + }; + if let Some(subdirectory) = subdirectory { + if !source_dist_entry.path().join(subdirectory).is_dir() { + return Err(Error::MissingSubdirectory( + url.clone(), + subdirectory.to_path_buf(), + )); + } + } + let cache_shard = build_info + .cache_shard() + .map(|digest| cache_shard.shard(digest)) + .unwrap_or(cache_shard); + + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. + if let Some(file) = BuiltWheelFile::find_in_cache(tags, &cache_shard) + .ok() + .flatten() + .filter(|file| file.matches(source.name(), source.version())) + { + return Ok(BuiltWheelMetadata::from_file( + file, + revision.into_hashes(), + cache_info, + build_info, + )); + } + let task = self .reporter .as_ref() @@ -587,8 +703,6 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { hashes: HashPolicy<'_>, client: &ManagedClient<'_>, ) -> Result { - let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; - // Fetch the revision for the source distribution. let revision = self .url_revision(source, ext, url, index, cache_shard, hashes, client) @@ -605,7 +719,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. - let cache_shard = cache_shard.shard(revision.id()); + let lock_shard = cache_shard; + let cache_shard = lock_shard.shard(revision.id()); let source_dist_entry = cache_shard.entry(SOURCE); // If the metadata is static, return it. @@ -623,21 +738,15 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the cache contains compatible metadata, return it. let metadata_entry = cache_shard.entry(METADATA); - match CachedMetadata::read(&metadata_entry).await { - Ok(Some(metadata)) => { - if metadata.matches(source.name(), source.version()) { - debug!("Using cached metadata for: {source}"); - return Ok(ArchiveMetadata { - metadata: Metadata::from_metadata23(metadata.into()), - hashes: revision.into_hashes(), - }); - } - debug!("Cached metadata does not match expected name and version for: {source}"); - } - Ok(None) => {} - Err(err) => { - debug!("Failed to deserialize cached metadata for: {source} ({err})"); - } + + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + return Ok(ArchiveMetadata { + metadata: Metadata::from_metadata23(metadata.into()), + hashes: revision.into_hashes(), + }); } // Otherwise, we need a wheel. @@ -667,6 +776,48 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { } } + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; + let _lock = lock_shard.lock().await.map_err(Error::CacheLock)?; + let revision = Self::read_canonical_http_revision(lock_shard, hashes)?.unwrap_or(revision); + let cache_shard = lock_shard.shard(revision.id()); + let source_dist_entry = cache_shard.entry(SOURCE); + let revision = if source_dist_entry.path().is_dir() { + revision + } else { + self.heal_url_revision( + source, + ext, + url, + index, + &source_dist_entry, + revision, + hashes, + client, + ) + .await? + }; + if let Some(subdirectory) = subdirectory { + if !source_dist_entry.path().join(subdirectory).is_dir() { + return Err(Error::MissingSubdirectory( + url.clone(), + subdirectory.to_path_buf(), + )); + } + } + let metadata_entry = cache_shard.entry(METADATA); + + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + return Ok(ArchiveMetadata { + metadata: Metadata::from_metadata23(metadata.into()), + hashes: revision.into_hashes(), + }); + } + // Otherwise, we either need to build the metadata. // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self @@ -861,15 +1012,12 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags: &Tags, hashes: HashPolicy<'_>, ) -> Result { - let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; - // Fetch the revision for the source distribution. - let LocalRevisionPointer { - cache_info, - revision, - } = self + let provisional = self .archive_revision(source, resource, cache_shard, hashes) .await?; + let cache_info = provisional.cache_info().clone(); + let revision = provisional.revision().clone(); // Before running the build, check that the hashes match. if !revision.satisfies(hashes) { @@ -882,7 +1030,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. - let cache_shard = cache_shard.shard(revision.id()); + let lock_shard = cache_shard; + let cache_shard = lock_shard.shard(revision.id()); let source_entry = cache_shard.entry(SOURCE); // If there are build settings or extra build dependencies, we need to scope to a cache shard. @@ -917,6 +1066,46 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { self.heal_archive_revision(source, resource, &source_entry, revision, hashes) .await? }; + let provisional = LocalRevisionPointer { + cache_info, + revision, + }; + + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; + let _lock = lock_shard.lock().await.map_err(Error::CacheLock)?; + + let canonical = self + .commit_canonical_local_revision_pointer(lock_shard, provisional, hashes) + .await?; + let cache_info = canonical.cache_info().clone(); + let revision = canonical.revision().clone(); + let cache_shard = lock_shard.shard(revision.id()); + let source_entry = cache_shard.entry(SOURCE); + let revision = if source_entry.path().is_dir() { + revision + } else { + self.heal_archive_revision(source, resource, &source_entry, revision, hashes) + .await? + }; + let cache_shard = build_info + .cache_shard() + .map(|digest| cache_shard.shard(digest)) + .unwrap_or(cache_shard); + + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. + if let Some(file) = BuiltWheelFile::find_in_cache(tags, &cache_shard) + .ok() + .flatten() + .filter(|file| file.matches(source.name(), source.version())) + { + return Ok(BuiltWheelMetadata::from_file( + file, + revision.into_hashes(), + cache_info, + build_info, + )); + } let task = self .reporter @@ -966,12 +1155,12 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { cache_shard: &CacheShard, hashes: HashPolicy<'_>, ) -> Result { - let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; - // Fetch the revision for the source distribution. - let LocalRevisionPointer { revision, .. } = self + let provisional = self .archive_revision(source, resource, cache_shard, hashes) .await?; + let cache_info = provisional.cache_info().clone(); + let revision = provisional.revision().clone(); // Before running the build, check that the hashes match. if !revision.satisfies(hashes) { @@ -984,7 +1173,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. - let cache_shard = cache_shard.shard(revision.id()); + let lock_shard = cache_shard; + let cache_shard = lock_shard.shard(revision.id()); let source_entry = cache_shard.entry(SOURCE); // If the metadata is static, return it. @@ -1001,21 +1191,15 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // If the cache contains compatible metadata, return it. let metadata_entry = cache_shard.entry(METADATA); - match CachedMetadata::read(&metadata_entry).await { - Ok(Some(metadata)) => { - if metadata.matches(source.name(), source.version()) { - debug!("Using cached metadata for: {source}"); - return Ok(ArchiveMetadata { - metadata: Metadata::from_metadata23(metadata.into()), - hashes: revision.into_hashes(), - }); - } - debug!("Cached metadata does not match expected name and version for: {source}"); - } - Ok(None) => {} - Err(err) => { - debug!("Failed to deserialize cached metadata for: {source} ({err})"); - } + + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + return Ok(ArchiveMetadata { + metadata: Metadata::from_metadata23(metadata.into()), + hashes: revision.into_hashes(), + }); } // Otherwise, we need a source distribution. @@ -1025,6 +1209,39 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { self.heal_archive_revision(source, resource, &source_entry, revision, hashes) .await? }; + let provisional = LocalRevisionPointer { + cache_info, + revision, + }; + + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; + let _lock = lock_shard.lock().await.map_err(Error::CacheLock)?; + + let canonical = self + .commit_canonical_local_revision_pointer(lock_shard, provisional, hashes) + .await?; + let revision = canonical.into_revision(); + let cache_shard = lock_shard.shard(revision.id()); + let source_entry = cache_shard.entry(SOURCE); + let revision = if source_entry.path().is_dir() { + revision + } else { + self.heal_archive_revision(source, resource, &source_entry, revision, hashes) + .await? + }; + let metadata_entry = cache_shard.entry(METADATA); + + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + return Ok(ArchiveMetadata { + metadata: Metadata::from_metadata23(metadata.into()), + hashes: revision.into_hashes(), + }); + } // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self @@ -1153,14 +1370,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Include the hashes and cache info in the revision. let revision = revision.with_hashes(HashDigests::from(hashes)); - // Persist the revision. - let pointer = LocalRevisionPointer { + Ok(LocalRevisionPointer { cache_info, revision, - }; - pointer.write_to(&revision_entry).await?; - - Ok(pointer) + }) } /// Build a source distribution from a local source tree (i.e., directory), either editable or @@ -1186,7 +1399,43 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }, ); - // Acquire the advisory lock. + // If there are build settings or extra build dependencies, we need to scope to a cache shard. + let config_settings = self.config_settings_for(source.name()); + let extra_build_deps = self.extra_build_dependencies_for(source.name()); + let extra_build_variables = self.extra_build_variables_for(source.name()); + let build_info = + BuildInfo::from_settings(&config_settings, extra_build_deps, extra_build_variables); + + // If the cache contains a compatible wheel, return it. + if let Some(LocalRevisionPointer { + cache_info, + revision, + }) = self.read_fresh_source_tree_revision(source, resource, &cache_shard)? + { + // Scope all operations to the revision. Within the revision, there's no need to check + // for freshness, since entries have to be fresher than the revision itself. + let revision_shard = cache_shard.shard(revision.id()); + let cache_shard = build_info + .cache_shard() + .map(|digest| revision_shard.shard(digest)) + .unwrap_or(revision_shard); + + if let Some(file) = BuiltWheelFile::find_in_cache(tags, &cache_shard) + .ok() + .flatten() + .filter(|file| file.matches(source.name(), source.version())) + { + return Ok(BuiltWheelMetadata::from_file( + file, + revision.into_hashes(), + cache_info, + build_info, + )); + } + } + + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; // Fetch the revision for the source distribution. @@ -1199,20 +1448,13 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // Scope all operations to the revision. Within the revision, there's no need to check for // freshness, since entries have to be fresher than the revision itself. - let cache_shard = cache_shard.shard(revision.id()); - - // If there are build settings or extra build dependencies, we need to scope to a cache shard. - let config_settings = self.config_settings_for(source.name()); - let extra_build_deps = self.extra_build_dependencies_for(source.name()); - let extra_build_variables = self.extra_build_variables_for(source.name()); - let build_info = - BuildInfo::from_settings(&config_settings, extra_build_deps, extra_build_variables); + let revision_shard = cache_shard.shard(revision.id()); let cache_shard = build_info .cache_shard() - .map(|digest| cache_shard.shard(digest)) - .unwrap_or(cache_shard); + .map(|digest| revision_shard.shard(digest)) + .unwrap_or(revision_shard); - // If the cache contains a compatible wheel, return it. + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. if let Some(file) = BuiltWheelFile::find_in_cache(tags, &cache_shard) .ok() .flatten() @@ -1310,7 +1552,45 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }, ); - // Acquire the advisory lock. + // If the cache contains compatible metadata, return it. + if let Some(LocalRevisionPointer { revision, .. }) = + self.read_fresh_source_tree_revision(source, resource, &cache_shard)? + { + // Scope all operations to the revision. Within the revision, there's no need to check + // for freshness, since entries have to be fresher than the revision itself. + let cache_shard = cache_shard.shard(revision.id()); + let metadata_entry = cache_shard.entry(METADATA); + + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + // If necessary, mark the metadata as dynamic. + let metadata = if dynamic { + ResolutionMetadata { + dynamic: true, + ..metadata.into() + } + } else { + metadata.into() + }; + return Ok(ArchiveMetadata::from( + Metadata::from_workspace( + metadata, + resource.install_path, + None, + self.build_context.locations(), + self.build_context.sources().clone(), + self.build_context.workspace_cache(), + credentials_cache, + ) + .await?, + )); + } + } + + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; // Fetch the revision for the source distribution. @@ -1322,41 +1602,33 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { // freshness, since entries have to be fresher than the revision itself. let cache_shard = cache_shard.shard(revision.id()); - // If the cache contains compatible metadata, return it. + // Re-check the cache under lock to avoid duplicate builds across concurrent tasks. let metadata_entry = cache_shard.entry(METADATA); - match CachedMetadata::read(&metadata_entry).await { - Ok(Some(metadata)) => { - if metadata.matches(source.name(), source.version()) { - debug!("Using cached metadata for: {source}"); - - // If necessary, mark the metadata as dynamic. - let metadata = if dynamic { - ResolutionMetadata { - dynamic: true, - ..metadata.into() - } - } else { - metadata.into() - }; - return Ok(ArchiveMetadata::from( - Metadata::from_workspace( - metadata, - resource.install_path, - None, - self.build_context.locations(), - self.build_context.sources().clone(), - self.build_context.workspace_cache(), - credentials_cache, - ) - .await?, - )); + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + // If necessary, mark the metadata as dynamic. + let metadata = if dynamic { + ResolutionMetadata { + dynamic: true, + ..metadata.into() } - debug!("Cached metadata does not match expected name and version for: {source}"); - } - Ok(None) => {} - Err(err) => { - debug!("Failed to deserialize cached metadata for: {source} ({err})"); - } + } else { + metadata.into() + }; + return Ok(ArchiveMetadata::from( + Metadata::from_workspace( + metadata, + resource.install_path, + None, + self.build_context.locations(), + self.build_context.sources().clone(), + self.build_context.workspace_cache(), + credentials_cache, + ) + .await?, + )); } // If the backend supports `prepare_metadata_for_build_wheel`, use it. @@ -1464,6 +1736,42 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { )) } + /// Read the [`Revision`] for a local source tree from the cache, if it is fresh and matches + /// the expected cache info. + fn read_fresh_source_tree_revision( + &self, + source: &BuildableSource<'_>, + resource: &DirectorySourceUrl<'_>, + cache_shard: &CacheShard, + ) -> Result, Error> { + // Verify that the source tree exists. + if !resource.install_path.is_dir() { + return Err(Error::NotFound(resource.url.clone())); + } + + // Read the existing metadata from the cache. + let entry = cache_shard.entry(LOCAL_REVISION); + + // If the revision isn't fresh, it can't be used. + if !self + .build_context + .cache() + .freshness(&entry, source.name(), source.source_tree()) + .map_err(Error::CacheRead)? + .is_fresh() + { + return Ok(None); + } + + // Determine the last-modified time of the source distribution. + let cache_info = CacheInfo::from_directory(resource.install_path)?; + Ok(Self::read_matching_local_revision_pointer( + source, + &entry, + &cache_info, + )) + } + /// Return the [`Revision`] for a local source tree, refreshing it if necessary. async fn source_tree_revision( &self, @@ -1490,18 +1798,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .map_err(Error::CacheRead)? .is_fresh() { - match LocalRevisionPointer::read_from(&entry) { - Ok(Some(pointer)) => { - if *pointer.cache_info() == cache_info { - return Ok(pointer); - } - - debug!("Cached revision does not match expected cache info for: {source}"); - } - Ok(None) => {} - Err(err) => { - debug!("Failed to deserialize cached revision for: {source} ({err})"); - } + if let Some(pointer) = + Self::read_matching_local_revision_pointer(source, &entry, &cache_info) + { + return Ok(pointer); } } @@ -1516,6 +1816,28 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { Ok(pointer) } + fn read_matching_local_revision_pointer( + source: &BuildableSource<'_>, + entry: &CacheEntry, + cache_info: &CacheInfo, + ) -> Option { + match LocalRevisionPointer::read_from(entry) { + Ok(Some(pointer)) => { + if pointer.cache_info() == cache_info { + Some(pointer) + } else { + debug!("Cached revision does not match expected cache info for: {source}"); + None + } + } + Ok(None) => None, + Err(err) => { + debug!("Failed to deserialize cached revision for: {source} ({err})"); + None + } + } + } + /// Return the [`RequiresDist`] from a `pyproject.toml`, if it can be statically extracted. pub(crate) async fn source_tree_requires_dist( &self, @@ -1615,7 +1937,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); let metadata_entry = cache_shard.entry(METADATA); - // Acquire the advisory lock. + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; // We don't track any cache information for Git-based source distributions; they're assumed @@ -1830,7 +2153,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); let metadata_entry = cache_shard.entry(METADATA); - // Acquire the advisory lock. + // Acquire the concurrency permit and advisory lock. + let _permit = self.acquire_concurrency_permit().await; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?; let path = if let Some(subdirectory) = resource.subdirectory { @@ -1873,36 +2197,26 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { .map_err(Error::CacheRead)? .is_fresh() { - match CachedMetadata::read(&metadata_entry).await { - Ok(Some(metadata)) => { - if metadata.matches(source.name(), source.version()) { - debug!("Using cached metadata for: {source}"); - - let git_member = GitWorkspaceMember { - fetch_root: fetch.path(), - git_source: resource, - }; - return Ok(ArchiveMetadata::from( - Metadata::from_workspace( - metadata.into(), - &path, - Some(&git_member), - self.build_context.locations(), - self.build_context.sources().clone(), - self.build_context.workspace_cache(), - credentials_cache, - ) - .await?, - )); - } - debug!( - "Cached metadata does not match expected name and version for: {source}" - ); - } - Ok(None) => {} - Err(err) => { - debug!("Failed to deserialize cached metadata for: {source} ({err})"); - } + if let Some(metadata) = self + .read_matching_cached_metadata(source, &metadata_entry) + .await + { + let git_member = GitWorkspaceMember { + fetch_root: fetch.path(), + git_source: resource, + }; + return Ok(ArchiveMetadata::from( + Metadata::from_workspace( + metadata.into(), + &path, + Some(&git_member), + self.build_context.locations(), + self.build_context.sources().clone(), + self.build_context.workspace_cache(), + credentials_cache, + ) + .await?, + )); } } diff --git a/crates/uv/src/commands/pip/operations.rs b/crates/uv/src/commands/pip/operations.rs index fa8c12d12dcb1..ba49e5b236938 100644 --- a/crates/uv/src/commands/pip/operations.rs +++ b/crates/uv/src/commands/pip/operations.rs @@ -160,6 +160,7 @@ pub(crate) async fn resolve( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer))) @@ -178,6 +179,7 @@ pub(crate) async fn resolve( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer))) @@ -290,6 +292,7 @@ pub(crate) async fn resolve( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer))) @@ -325,6 +328,7 @@ pub(crate) async fn resolve( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer))) @@ -377,6 +381,7 @@ pub(crate) async fn resolve( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), )? .with_reporter(Arc::new(reporter)); @@ -773,6 +778,7 @@ async fn execute_plan( client, build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new( diff --git a/crates/uv/src/commands/project/add.rs b/crates/uv/src/commands/project/add.rs index 25936cfb6cf41..e2287d93e8269 100644 --- a/crates/uv/src/commands/project/add.rs +++ b/crates/uv/src/commands/project/add.rs @@ -468,6 +468,7 @@ pub(crate) async fn add( &client, &build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer))) diff --git a/crates/uv/src/commands/project/lock.rs b/crates/uv/src/commands/project/lock.rs index 0067b7322f8cd..865494c98a112 100644 --- a/crates/uv/src/commands/project/lock.rs +++ b/crates/uv/src/commands/project/lock.rs @@ -802,6 +802,7 @@ async fn do_lock( &client, &build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ); // If any of the resolution-determining settings changed, invalidate the lock. diff --git a/crates/uv/src/commands/project/mod.rs b/crates/uv/src/commands/project/mod.rs index 5cc73315453aa..8cc6d85b77408 100644 --- a/crates/uv/src/commands/project/mod.rs +++ b/crates/uv/src/commands/project/mod.rs @@ -1939,6 +1939,7 @@ pub(crate) async fn resolve_names( &client, &build_dispatch, concurrency.downloads_semaphore.clone(), + concurrency.source_distribution_semaphore.clone(), ), ) .with_reporter(Arc::new(ResolverReporter::from(printer)))