Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions crates/uv-bench/benches/uv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ mod resolver {
exclude_newer,
sources,
workspace_cache,
concurrency,
concurrency.clone(),
Preview::default(),
);

Expand All @@ -226,7 +226,11 @@ mod resolver {
&hashes,
&build_context,
installed_packages,
DistributionDatabase::new(client, &build_context, concurrency.downloads),
DistributionDatabase::new(
client,
&build_context,
concurrency.downloads_semaphore.clone(),
),
)?;

Ok(resolver.resolve().await?)
Expand Down
6 changes: 3 additions & 3 deletions crates/uv-build-frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ pub struct SourceBuildContext {
}

impl SourceBuildContext {
/// Create a [`SourceBuildContext`] with the given concurrent build limit.
pub fn new(concurrent_builds: usize) -> Self {
/// Create a [`SourceBuildContext`] with the given shared concurrency semaphore.
pub fn new(concurrent_build_slots: Arc<Semaphore>) -> Self {
Self {
default_resolution: Arc::default(),
concurrent_build_slots: Arc::new(Semaphore::new(concurrent_builds)),
concurrent_build_slots,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/uv-configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ rustc-hash = { workspace = true }
same-file = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
tokio = { workspace = true }
serde-untagged = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
Expand Down
40 changes: 34 additions & 6 deletions crates/uv-configuration/src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::fmt;
use std::num::NonZeroUsize;
use std::sync::Arc;

use tokio::sync::Semaphore;

/// Concurrency limit settings.
#[derive(Copy, Clone, Debug)]
// TODO(konsti): We should find a pattern that doesn't require having both semaphores and counts.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we require both semaphores and counts right now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's rayon (no tokio semaphores) and for the tokio part there's a buffer_unordered that needs a number as limit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Rayon we may be able to use the current tokio runtime handle to block_on the semaphore acquire call. This would block that rayon thread, but this would in effect mean that if there was something which had also spawned a rayon thread pool, there would only concurrency.whatever worth of actively working threads and the other threads would be sleeping. This isn't quite as good as having rayon limit its thread pool size somehow, but it's probably sufficient to ensure we don't do too much stuff at the same time.

Regarding buffer_unordered, I worry that it could lead to us having futures which aren't getting polled if we ever do async work for each complete item of work1. And I think we should just use a slightly less error prone abstraction for that case.

I'd say that fixing that would probably fall outside of this PR though.

Footnotes

  1. @oconnor663 calls this "snoozing". There's also a good write-up of this case here: https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/barbara_battles_buffered_streams.html I'm not certain we don't run into this somewhere, but for the places I have looked at, we don't.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On that note, it feels weird to re-use this one type to hold both what the user configured and the semaphores. I imagine it would be a bit of churn but we could add a new type for the user facing side, avoid the "Debug" hack below, and simply have a function which creates a Concurrency (could be renamed if necessary) from a ConcurrencyConfig or whatever.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Rayon we may be able to use the current tokio runtime handle to block_on the semaphore acquire call. This would block that rayon thread, but this would in effect mean that if there was something which had also spawned a rayon thread pool, there would only concurrency.whatever worth of actively working threads and the other threads would be sleeping. This isn't quite as good as having rayon limit its thread pool size somehow, but it's probably sufficient to ensure we don't do too much stuff at the same time.

My approach is kinda from the other end: As we mix different schedulers/runtimes and given that rayon has the global initialization, a bit of data duplication really isn't that bad. I'd add even more complexity to this type, if it reduces complexity in the sites where we run business logic.

Regarding buffer_unordered, I worry that it could lead to us having futures which aren't getting polled if we ever do async work for each complete item of work1. And I think we should just use a slightly less error prone abstraction for that case.

Going of the original PR story cause it's interesting: Don't we avoid this problem by buffer_unordered being the last in the chain for us? As in, could Barbara have done this:

async fn do_work(database: &Database) {
    let work = do_select(database, FIND_WORK_QUERY).await?;
    
    let work_limit = Semaphore::new(1);

    stream::iter(work)
        .map(|item| async move {
            let work_item = do_select(database, work_from_item(item)).await;
            let _permit = work_limit.acquire().await.unwrap();
            process_work_item(database, work_item).await;
        })
        .buffered(5)
        .for_each(|()| std::future::ready(()))
        .await;
}

More practically, should we drop buffer_unordered and just collect all the futures into a FuturesUnordered directly, given that the limit is now internal, and ideally moves into the client eventually?

Copy link
Copy Markdown
Contributor

@EliteTK EliteTK Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point regarding rayon was not really about duplication, just about the fact that if we use a semaphore in some places and we just use the raw number in other places, we're still doing more work than what we claim we want to do, if we ever end up using both concurrently. So if we set up a rayon thread pool with x threads and we have a semaphore with x slots and we do work in the thread pool and outside with the semaphore then we're doing up to 2x worth of work. If we then also use x in a third place where we use it for buffer_unordered we're doing up to 3x worth of work.

If the goal is to only ever do up to 1x worth of work, then we'd need everything to use the semaphore.

Regarding your suggestion regarding Barbara's code. It would fix the issue, but it wouldn't quite preserve the semantics, although it's maybe a better solution than what that page suggests.

The issue is that once a do_select completes we would ideally want to start another do_select immediately while we process_work_item on the one that just. In your version, the actual number of concurrent do_select will only be 4 while there is a process_work_item being progressed. In the extreme case, if we wanted to concurrently do up to 1 select and process up to one work item, we would actually only ever be doing one or the other.

However, there is still the question of how to make the "5" in that code (passed to buffered(5)) use a semaphore instead. One option is to create all the futures for all work items and then use the approach you have shown there, but this has the possibility of taking up quite a lot of memory if the futures have a lot of state. That's one of the reasons why .buffered(5) there is preferred as we're only ever holding 5 futures for the work tasks at a time.

It's a bit tricky to figure out how to limit the number of futures you have to keep around based on the number of semaphore slots you have. But we could limit the number of futures to the maximum concurrency we could ever have, and only progress them when we can acquire the semaphore... Something like:

let post_processing_limit = 1;
let post_processing_limit_semaphore = Semaphore::new(post_processing_limit);
stream::iter(work)
        .map(|item| async move {
            let result = {
                let _permit = concurrency_limit_semaphore.acquire().await.unwrap();
                do_work_on(item).await
            };
            let _permit = post_processing_limit_semaphore.acquire().await.unwrap();
            post_process(result).await;
        })
        .buffer_unordered(concurrency_limit)
        .for_each(|()| std::future::ready(()))
        .await;

So I guess we could with some caveats use buffered_unordered after all, but if we do use it, we should wrap it up in some abstraction which doesn't require us to keep writing something like the above everywhere. It's probably a good solution for the time being. We could even do concurrency_limit + post_processing_limit in the buffer_unordered to solve the issue I mentioned above with us doing less work than we intend.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point regarding rayon was not really about duplication, just about the fact that if we use a semaphore in some places and we just use the raw number in other places, we're still doing more work than what we claim we want to do, if we ever end up using both concurrently. So if we set up a rayon thread pool with x threads and we have a semaphore with x slots and we do work in the thread pool and outside with the semaphore then we're doing up to 2x worth of work. If we then also use x in a third place where we use it for buffer_unordered we're doing up to 3x worth of work.

If the goal is to only ever do up to 1x worth of work, then we'd need everything to use the semaphore.

I see! In our case installs are rayon, and only installs (and unzip, but that's also install except for legacy source dists) use rayon, so there's no semaphore for rayon.

[...]

So I guess we could with some caveats use buffered_unordered after all, but if we do use it, we should wrap it up in some abstraction which doesn't require us to keep writing something like the above everywhere. It's probably a good solution for the time being. We could even do concurrency_limit + post_processing_limit in the buffer_unordered to solve the issue I mentioned above with us doing less work than we intend.

Thanks for the explanation!

#[derive(Clone)]
pub struct Concurrency {
/// The maximum number of concurrent downloads.
///
Expand All @@ -15,22 +20,45 @@ pub struct Concurrency {
///
/// Note this value must be non-zero.
pub installs: usize,
/// A global semaphore to limit the number of concurrent downloads.
pub downloads_semaphore: Arc<Semaphore>,
/// A global semaphore to limit the number of concurrent builds.
pub builds_semaphore: Arc<Semaphore>,
}

/// Custom `Debug` to hide semaphore fields from `--show-settings` output.
#[expect(clippy::missing_fields_in_debug)]
impl fmt::Debug for Concurrency {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Concurrency")
.field("downloads", &self.downloads)
.field("builds", &self.builds)
.field("installs", &self.installs)
.finish()
}
}

impl Default for Concurrency {
fn default() -> Self {
Self {
downloads: Self::DEFAULT_DOWNLOADS,
builds: Self::threads(),
installs: Self::threads(),
}
Self::new(Self::DEFAULT_DOWNLOADS, Self::threads(), Self::threads())
}
}

impl Concurrency {
// The default concurrent downloads limit.
pub const DEFAULT_DOWNLOADS: usize = 50;

/// Create a new [`Concurrency`] with the given limits.
pub fn new(downloads: usize, builds: usize, installs: usize) -> Self {
Self {
downloads,
builds,
installs,
downloads_semaphore: Arc::new(Semaphore::new(downloads)),
builds_semaphore: Arc::new(Semaphore::new(builds)),
}
}

// The default concurrent builds and install limit.
pub fn threads() -> usize {
std::thread::available_parallelism()
Expand Down
18 changes: 13 additions & 5 deletions crates/uv-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl<'a> BuildDispatch<'a> {
build_options,
hasher,
exclude_newer,
source_build_context: SourceBuildContext::new(concurrency.builds),
source_build_context: SourceBuildContext::new(concurrency.builds_semaphore.clone()),
build_extra_env_vars: FxHashMap::default(),
sources,
workspace_cache,
Expand Down Expand Up @@ -264,8 +264,12 @@ impl BuildContext for BuildDispatch<'_> {
self.hasher,
self,
EmptyInstalledPackages,
DistributionDatabase::new(self.client, self, self.concurrency.downloads)
.with_build_stack(build_stack),
DistributionDatabase::new(
self.client,
self,
self.concurrency.downloads_semaphore.clone(),
)
.with_build_stack(build_stack),
)?;
let resolution = Resolution::from(resolver.resolve().await.with_context(|| {
format!(
Expand Down Expand Up @@ -353,8 +357,12 @@ impl BuildContext for BuildDispatch<'_> {
tags,
self.hasher,
self.build_options,
DistributionDatabase::new(self.client, self, self.concurrency.downloads)
.with_build_stack(build_stack),
DistributionDatabase::new(
self.client,
self,
self.concurrency.downloads_semaphore.clone(),
)
.with_build_stack(build_stack),
);

debug!(
Expand Down
12 changes: 6 additions & 6 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
pub fn new(
client: &'a RegistryClient,
build_context: &'a Context,
concurrent_downloads: usize,
downloads_semaphore: Arc<Semaphore>,
) -> Self {
Self {
build_context,
builder: SourceDistributionBuilder::new(build_context),
client: ManagedClient::new(client, concurrent_downloads),
client: ManagedClient::new(client, downloads_semaphore),
reporter: None,
}
}
Expand Down Expand Up @@ -1151,15 +1151,15 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
/// A wrapper around `RegistryClient` that manages a concurrency limit.
pub struct ManagedClient<'a> {
pub unmanaged: &'a RegistryClient,
control: Semaphore,
control: Arc<Semaphore>,
}

impl<'a> ManagedClient<'a> {
/// Create a new `ManagedClient` using the given client and concurrency limit.
fn new(client: &'a RegistryClient, concurrency: usize) -> Self {
/// Create a new `ManagedClient` using the given client and concurrency semaphore.
fn new(client: &'a RegistryClient, control: Arc<Semaphore>) -> Self {
ManagedClient {
unmanaged: client,
control: Semaphore::new(concurrency),
control,
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/uv/src/commands/build_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(crate) async fn build_frontend(
no_config,
python_preference,
python_downloads,
concurrency,
&concurrency,
cache,
printer,
preview,
Expand Down Expand Up @@ -193,7 +193,7 @@ async fn build_impl(
no_config: bool,
python_preference: PythonPreference,
python_downloads: PythonDownloads,
concurrency: Concurrency,
concurrency: &Concurrency,
cache: &Cache,
printer: Printer,
preview: Preview,
Expand Down Expand Up @@ -469,7 +469,7 @@ async fn build_package(
keyring_provider: KeyringProviderType,
exclude_newer: ExcludeNewer,
sources: NoSources,
concurrency: Concurrency,
concurrency: &Concurrency,
build_options: &BuildOptions,
sdist: bool,
wheel: bool,
Expand Down Expand Up @@ -630,7 +630,7 @@ async fn build_package(
exclude_newer,
sources.clone(),
workspace_cache,
concurrency,
concurrency.clone(),
preview,
);

Expand Down
4 changes: 2 additions & 2 deletions crates/uv/src/commands/pip/compile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ pub(crate) async fn pip_compile(
exclude_newer.clone(),
sources,
WorkspaceCache::default(),
concurrency,
concurrency.clone(),
preview,
);

Expand Down Expand Up @@ -580,7 +580,7 @@ pub(crate) async fn pip_compile(
&flat_index,
&top_level_index,
&build_dispatch,
concurrency,
&concurrency,
options,
Box::new(DefaultResolveLogger),
printer,
Expand Down
8 changes: 4 additions & 4 deletions crates/uv/src/commands/pip/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ pub(crate) async fn pip_install(
exclude_newer.clone(),
sources.clone(),
WorkspaceCache::default(),
concurrency,
concurrency.clone(),
preview,
);

Expand Down Expand Up @@ -595,7 +595,7 @@ pub(crate) async fn pip_install(
&flat_index,
state.index(),
&build_dispatch,
concurrency,
&concurrency,
options,
Box::new(DefaultResolveLogger),
printer,
Expand Down Expand Up @@ -640,7 +640,7 @@ pub(crate) async fn pip_install(
exclude_newer.clone(),
sources,
WorkspaceCache::default(),
concurrency,
concurrency.clone(),
preview,
);

Expand All @@ -658,7 +658,7 @@ pub(crate) async fn pip_install(
&tags,
&client,
state.in_flight(),
concurrency,
&concurrency,
&build_dispatch,
&cache,
&environment,
Expand Down
3 changes: 1 addition & 2 deletions crates/uv/src/commands/pip/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use itertools::Itertools;
use owo_colors::OwoColorize;
use rustc_hash::{FxHashMap, FxHashSet};
use serde::Serialize;
use tokio::sync::Semaphore;
use tracing::debug;
use unicode_width::UnicodeWidthStr;

Expand Down Expand Up @@ -117,7 +116,7 @@ pub(crate) async fn pip_list(
.markers(environment.interpreter().markers())
.platform(environment.interpreter().platform())
.build();
let download_concurrency = Semaphore::new(concurrency.downloads);
let download_concurrency = concurrency.downloads_semaphore.clone();

// Determine the platform tags.
let interpreter = environment.interpreter();
Expand Down
Loading
Loading