Skip to content

Commit

Permalink
switch to current-thread tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev committed Jul 9, 2024
1 parent 72dd34b commit 79e313a
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ tempfile = { version = "3.9.0" }
textwrap = { version = "0.16.1" }
thiserror = { version = "1.0.56" }
tl = { version = "0.7.7" }
tokio = { version = "1.35.1", features = ["fs", "io-util", "macros", "process", "rt-multi-thread", "sync"] }
tokio = { version = "1.35.1", features = ["fs", "io-util", "macros", "process", "sync"] }
tokio-stream = { version = "0.1.14" }
tokio-tar = { version = "0.3.1" }
tokio-util = { version = "0.7.10", features = ["compat"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/bench/benches/uv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn resolve_warm_jupyter(c: &mut Criterion<WallTime>) {
}

fn resolve_warm_airflow(c: &mut Criterion<WallTime>) {
let runtime = &tokio::runtime::Builder::new_multi_thread()
let runtime = &tokio::runtime::Builder::new_current_thread()
// CodSpeed limits the total number of threads to 500
.max_blocking_threads(256)
.enable_all()
Expand Down
7 changes: 4 additions & 3 deletions crates/uv-dispatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,17 @@ impl<'a> BuildContext for BuildDispatch<'a> {
}

// Install the resolved distributions.
let wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
let mut wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
if !wheels.is_empty() {
debug!(
"Installing build requirement{}: {}",
if wheels.len() == 1 { "" } else { "s" },
wheels.iter().map(ToString::to_string).join(", ")
);
Installer::new(venv)
wheels = Installer::new(venv)
.with_link_mode(self.link_mode)
.install(&wheels)
.install(wheels)
.await
.context("Failed to install build dependencies")?;
}

Expand Down
102 changes: 73 additions & 29 deletions crates/uv-installer/src/installer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
use std::convert;

use anyhow::{Context, Error, Result};
use install_wheel_rs::{linker::LinkMode, Layout};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use tokio::sync::oneshot;
use tracing::instrument;

use distribution_types::CachedDist;
use uv_python::PythonEnvironment;

pub struct Installer<'a> {
venv: &'a PythonEnvironment,
link_mode: install_wheel_rs::linker::LinkMode,
link_mode: LinkMode,
reporter: Option<Box<dyn Reporter>>,
installer_name: Option<String>,
}
Expand All @@ -17,15 +21,15 @@ impl<'a> Installer<'a> {
pub fn new(venv: &'a PythonEnvironment) -> Self {
Self {
venv,
link_mode: install_wheel_rs::linker::LinkMode::default(),
link_mode: LinkMode::default(),
reporter: None,
installer_name: Some("uv".to_string()),
}
}

/// Set the [`LinkMode`][`install_wheel_rs::linker::LinkMode`] to use for this installer.
#[must_use]
pub fn with_link_mode(self, link_mode: install_wheel_rs::linker::LinkMode) -> Self {
pub fn with_link_mode(self, link_mode: LinkMode) -> Self {
Self { link_mode, ..self }
}

Expand All @@ -49,35 +53,75 @@ impl<'a> Installer<'a> {

/// Install a set of wheels into a Python virtual environment.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
pub fn install(self, wheels: &[CachedDist]) -> Result<()> {
let layout = self.venv.interpreter().layout();
tokio::task::block_in_place(|| {
wheels.par_iter().try_for_each(|wheel| {
install_wheel_rs::linker::install_wheel(
&layout,
wheel.path(),
wheel.filename(),
wheel
.parsed_url()?
.as_ref()
.map(pypi_types::DirectUrl::try_from)
.transpose()?
.as_ref(),
self.installer_name.as_deref(),
self.link_mode,
)
.with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;

if let Some(reporter) = self.reporter.as_ref() {
reporter.on_install_progress(wheel);
}

Ok::<(), Error>(())
})
})
pub async fn install(self, wheels: Vec<CachedDist>) -> Result<Vec<CachedDist>> {
let (tx, rx) = oneshot::channel();

let Self {
venv,
link_mode,
reporter,
installer_name,
} = self;
let layout = venv.interpreter().layout();

rayon::spawn(move || {
let result = install(wheels, layout, installer_name, link_mode, reporter);
tx.send(result).unwrap();
});

rx.await
.map_err(|_| anyhow::anyhow!("`install_blocking` task panicked"))
.and_then(convert::identity)
}

/// Install a set of wheels into a Python virtual environment synchronously.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
pub fn install_blocking(self, wheels: Vec<CachedDist>) -> Result<Vec<CachedDist>> {
install(
wheels,
self.venv.interpreter().layout(),
self.installer_name,
self.link_mode,
self.reporter,
)
}
}

/// Install a set of wheels into a Python virtual environment synchronously.
#[instrument(skip_all, fields(num_wheels = %wheels.len()))]
fn install(
wheels: Vec<CachedDist>,
layout: Layout,
installer_name: Option<String>,
link_mode: LinkMode,
reporter: Option<Box<dyn Reporter>>,
) -> Result<Vec<CachedDist>> {
wheels.par_iter().try_for_each(|wheel| {
install_wheel_rs::linker::install_wheel(
&layout,
wheel.path(),
wheel.filename(),
wheel
.parsed_url()?
.as_ref()
.map(pypi_types::DirectUrl::try_from)
.transpose()?
.as_ref(),
installer_name.as_deref(),
link_mode,
)
.with_context(|| format!("Failed to install: {} ({wheel})", wheel.filename()))?;

if let Some(reporter) = reporter.as_ref() {
reporter.on_install_progress(wheel);
}

Ok::<(), Error>(())
})?;

Ok(wheels)
}

pub trait Reporter: Send + Sync {
/// Callback to invoke when a dependency is installed.
fn on_install_progress(&self, wheel: &CachedDist);
Expand Down
9 changes: 6 additions & 3 deletions crates/uv/src/commands/pip/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,16 @@ pub(crate) async fn install(
}

// Install the resolved distributions.
let wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
let mut wheels = wheels.into_iter().chain(cached).collect::<Vec<_>>();
if !wheels.is_empty() {
let start = std::time::Instant::now();
uv_installer::Installer::new(venv)
wheels = uv_installer::Installer::new(venv)
.with_link_mode(link_mode)
.with_reporter(InstallReporter::from(printer).with_length(wheels.len() as u64))
.install(&wheels)?;
// This technically can block the runtime, but we are on the main thread and
// have no other running tasks at this point, so this lets us avoid spawning a blocking
// task.
.install_blocking(wheels)?;

let s = if wheels.len() == 1 { "" } else { "s" };
writeln!(
Expand Down
4 changes: 2 additions & 2 deletions crates/uv/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ fn main() -> ExitCode {
let result = if let Ok(stack_size) = env::var("UV_STACK_SIZE") {
let stack_size = stack_size.parse().expect("Invalid stack size");
let tokio_main = move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.thread_stack_size(stack_size)
.build()
Expand All @@ -1035,7 +1035,7 @@ fn main() -> ExitCode {
.join()
.expect("Tokio executor failed, was there a panic?")
} else {
let runtime = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed building the Runtime");
Expand Down

0 comments on commit 79e313a

Please sign in to comment.