Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
389 changes: 218 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ crossbeam-channel = "0.5.14"
csv = "1.3.1"
ctrlc = "3.4.5"
dashmap = "6.1.0"
deno_task_shell = "0.24.0"
deno_task_shell = "0.26.0"
derive_more = "2.0.1"
dialoguer = "0.11.0"
digest = "0.10"
Expand Down
134 changes: 102 additions & 32 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ use std::{
convert::identity,
ffi::OsString,
string::String,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};

use clap::Parser;
use deno_task_shell::KillSignal;
use dialoguer::theme::ColorfulTheme;
use fancy_display::FancyDisplay;
use itertools::Itertools;
Expand All @@ -18,6 +15,7 @@ use pixi_config::{ConfigCli, ConfigCliActivation};
use pixi_manifest::{FeaturesExt, TaskName};
use rattler_conda_types::Platform;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use tracing::Level;

use super::cli_config::LockFileUpdateConfig;
Expand Down Expand Up @@ -134,16 +132,12 @@ pub async fn execute(args: Args) -> miette::Result<()> {
})
.await?;

let ctrlc_should_exit_process = Arc::new(AtomicBool::new(true));
let ctrlc_should_exit_process_clone = Arc::clone(&ctrlc_should_exit_process);

ctrlc::set_handler(move || {
reset_cursor();
if ctrlc_should_exit_process_clone.load(Ordering::Relaxed) {
exit_process_on_sigint();
// Spawn a task that listens for ctrl+c and resets the cursor.
tokio::spawn(async {
if tokio::signal::ctrl_c().await.is_ok() {
reset_cursor();
}
})
.into_diagnostic()?;
});

// Construct a task graph from the input arguments
let search_environment = SearchEnvironments::from_opt_env(
Expand Down Expand Up @@ -173,6 +167,10 @@ pub async fn execute(args: Args) -> miette::Result<()> {
// task.
let mut task_idx = 0;
let mut task_envs = HashMap::new();
let signal = KillSignal::default();
// make sure that child processes are killed when pixi stops
let _drop_guard = signal.clone().drop_guard();

for task_id in task_graph.topological_order() {
let executable_task = ExecutableTask::from_task_graph(&task_graph, task_id);

Expand Down Expand Up @@ -279,8 +277,6 @@ pub async fn execute(args: Args) -> miette::Result<()> {
}
};

ctrlc_should_exit_process.store(false, Ordering::Relaxed);

let task_env = task_env
.iter()
.map(|(k, v)| (OsString::from(k), OsString::from(v)))
Expand All @@ -289,7 +285,7 @@ pub async fn execute(args: Args) -> miette::Result<()> {
// Execute the task itself within the command environment. If one of the tasks
// failed with a non-zero exit code, we exit this parent process with
// the same code.
match execute_task(&executable_task, &task_env).await {
match execute_task(&executable_task, &task_env, signal.clone()).await {
Ok(_) => {
task_idx += 1;
}
Expand All @@ -302,9 +298,6 @@ pub async fn execute(args: Args) -> miette::Result<()> {
Err(err) => return Err(err.into()),
}

// Handle CTRL-C ourselves again
ctrlc_should_exit_process.store(true, Ordering::Relaxed);

// Update the task cache with the new hash
executable_task
.save_cache(lock_file.as_lock_file(), task_cache)
Expand Down Expand Up @@ -377,21 +370,22 @@ enum TaskExecutionError {
async fn execute_task(
task: &ExecutableTask<'_>,
command_env: &HashMap<OsString, OsString>,
kill_signal: KillSignal,
) -> Result<(), TaskExecutionError> {
let Some(script) = task.as_deno_script()? else {
return Ok(());
};
let cwd = task.working_directory()?;

let status_code = deno_task_shell::execute(
let execute_future = deno_task_shell::execute(
script,
command_env.clone(),
cwd,
Default::default(),
Default::default(),
)
.await;
kill_signal.clone(),
);

// Execute the process and forward signals.
let status_code = run_future_forwarding_signals(kill_signal, execute_future).await;
if status_code != 0 {
return Err(TaskExecutionError::NonZeroExitCode(status_code));
}
Expand Down Expand Up @@ -443,13 +437,89 @@ fn reset_cursor() {
let _ = term.show_cursor();
}

/// Exit the process with the appropriate exit code for a SIGINT.
fn exit_process_on_sigint() {
// https://learn.microsoft.com/en-us/cpp/c-runtime-library/signal-constants
#[cfg(target_os = "windows")]
std::process::exit(3);
// /// Exit the process with the appropriate exit code for a SIGINT.
// fn exit_process_on_sigint() {
// // https://learn.microsoft.com/en-us/cpp/c-runtime-library/signal-constants
// #[cfg(target_os = "windows")]
// std::process::exit(3);
//
// // POSIX compliant OSs: 128 + SIGINT (2)
// #[cfg(not(target_os = "windows"))]
// std::process::exit(130);
// }

/// Runs a task future forwarding any signals received to the process.
///
/// Signal listeners and ctrl+c listening will be setup.
pub async fn run_future_forwarding_signals<TOutput>(
kill_signal: KillSignal,
future: impl std::future::Future<Output = TOutput>,
) -> TOutput {
fn spawn_future_with_cancellation(
future: impl std::future::Future<Output = ()> + 'static,
token: CancellationToken,
) {
tokio::task::spawn_local(async move {
tokio::select! {
_ = future => {}
_ = token.cancelled() => {}
}
});
}

let token = CancellationToken::new();
let _token_drop_guard = token.clone().drop_guard();
let local_set = tokio::task::LocalSet::new();

local_set
.run_until(async move {
spawn_future_with_cancellation(listen_ctrl_c(kill_signal.clone()), token.clone());
#[cfg(unix)]
spawn_future_with_cancellation(listen_and_forward_all_signals(kill_signal), token);

future.await
})
.await
}

async fn listen_ctrl_c(kill_signal: KillSignal) {
while let Ok(()) = tokio::signal::ctrl_c().await {
// On windows, ctrl+c is sent to the process group, so the signal would
// have already been sent to the child process. We still want to listen
// for ctrl+c here to keep the process alive when receiving it, but no
// need to forward the signal because it's already been sent.
if !cfg!(windows) {
kill_signal.send(deno_task_shell::SignalKind::SIGINT)
}
}
}

#[cfg(unix)]
async fn listen_and_forward_all_signals(kill_signal: KillSignal) {
use futures::FutureExt;

// POSIX compliant OSs: 128 + SIGINT (2)
#[cfg(not(target_os = "windows"))]
std::process::exit(130);
use crate::signals::SIGNALS;

// listen and forward every signal we support
let mut futures = Vec::with_capacity(SIGNALS.len());
for signo in SIGNALS.iter().copied() {
if signo == libc::SIGKILL || signo == libc::SIGSTOP {
continue; // skip, can't listen to these
}

let kill_signal = kill_signal.clone();
futures.push(
async move {
let Ok(mut stream) = tokio::signal::unix::signal(signo.into()) else {
return;
};
let signal_kind = signo.into();
while let Some(()) = stream.recv().await {
kill_signal.send(signal_kind);
}
}
.boxed_local(),
)
}
futures::future::join_all(futures).await;
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod workspace;
mod reporters;

mod rlimit;
mod signals;
mod uv_reporter;
pub mod variants;

Expand Down
Loading
Loading