Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod search;
pub mod self_update;
pub mod shell;
pub mod shell_hook;
pub mod signals;
pub mod task;
pub mod tree;
pub mod update;
Expand Down
172 changes: 127 additions & 45 deletions src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ use std::{
convert::identity,
ffi::OsString,
string::String,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};

use clap::Parser;
use deno_task_shell::KillSignal;
use dialoguer::theme::ColorfulTheme;
use fancy_display::FancyDisplay;
use itertools::Itertools;
Expand All @@ -18,6 +15,8 @@ use pixi_config::{ConfigCli, ConfigCliActivation};
use pixi_manifest::TaskName;
use rattler_conda_types::Platform;
use thiserror::Error;
use tokio::task::LocalSet;
use tokio_util::sync::CancellationToken;
use tracing::Level;

use crate::{
Expand Down Expand Up @@ -137,16 +136,16 @@ pub async fn execute(args: Args) -> miette::Result<()> {
// dialoguer doesn't reset the cursor if it's aborted via e.g. SIGINT
// So we do it ourselves.

let ctrlc_should_exit_process = Arc::new(AtomicBool::new(true));
let ctrlc_should_exit_process_clone = Arc::clone(&ctrlc_should_exit_process);
// let ctrlc_should_exit_process = Arc::new(AtomicBool::new(true));
// let ctrlc_should_exit_process_clone = Arc::clone(&ctrlc_should_exit_process);

ctrlc::set_handler(move || {
reset_cursor();
if ctrlc_should_exit_process_clone.load(Ordering::Relaxed) {
exit_process_on_sigint();
}
})
.into_diagnostic()?;
// ctrlc::set_handler(move || {
// reset_cursor();
// if ctrlc_should_exit_process_clone.load(Ordering::Relaxed) {
// exit_process_on_sigint();
// }
// })
// .into_diagnostic()?;

// Construct a task graph from the input arguments
let search_environment = SearchEnvironments::from_opt_env(
Expand Down Expand Up @@ -280,7 +279,7 @@ pub async fn execute(args: Args) -> miette::Result<()> {
}
};

ctrlc_should_exit_process.store(false, Ordering::Relaxed);
// ctrlc_should_exit_process.store(false, Ordering::Relaxed);

let task_env = task_env
.iter()
Expand All @@ -304,7 +303,7 @@ pub async fn execute(args: Args) -> miette::Result<()> {
}

// Handle CTRL-C ourselves again
ctrlc_should_exit_process.store(true, Ordering::Relaxed);
// ctrlc_should_exit_process.store(true, Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove commented code.


// Update the task cache with the new hash
executable_task
Expand Down Expand Up @@ -371,6 +370,73 @@ enum TaskExecutionError {
UnsupportedPlatformError(#[from] UnsupportedPlatformError),
}

/// Runs a deno task future forwarding any signals received
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deno?

/// to the process.
///
/// Signal listeners and ctrl+c listening will be setup.
pub async fn run_future_forwarding_signals<TOutput>(
kill_signal: KillSignal,
future: impl std::future::Future<Output = TOutput>,
) -> TOutput {
fn spawn_future_with_cancellation(
future: impl std::future::Future<Output = ()> + 'static,
token: CancellationToken,
) {
// Spawn the future with cancellation token
tokio::task::spawn_local(async move {
// Wait for the future to complete or the token to be cancelled
tokio::select! {
_ = future => {},
_ = token.cancelled() => {},
}
});
}

let token = CancellationToken::new();
let _token_drop_guard = token.clone().drop_guard();
let _drop_guard = kill_signal.clone().drop_guard();

// spawn_future_with_cancellation(
// listen_ctrl_c(kill_signal.clone()),
// token.clone(),
// );
#[cfg(unix)]
spawn_future_with_cancellation(listen_and_forward_all_signals(kill_signal), token);

future.await
}

#[cfg(unix)]
async fn listen_and_forward_all_signals(kill_signal: KillSignal) {
// listen and forward every signal we support
use crate::cli::signals::SIGNAL_NUMS;
use futures::FutureExt as _;

let mut futures = Vec::with_capacity(SIGNAL_NUMS.len());
for signo in SIGNAL_NUMS.iter().copied() {
if signo == libc::SIGKILL || signo == libc::SIGSTOP {
continue; // skip, can't listen to these
}

let kill_signal = kill_signal.clone();
futures.push(
async move {
let Ok(mut stream) =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::from_raw(signo))
else {
return;
};
let signal_kind: deno_task_shell::SignalKind = signo.into();
while let Some(()) = stream.recv().await {
kill_signal.send(signal_kind);
}
}
.boxed_local(),
)
}
futures::future::join_all(futures).await;
}

/// Called to execute a single command.
///
/// This function is called from [`execute`].
Expand All @@ -383,14 +449,30 @@ async fn execute_task(
};
let cwd = task.working_directory()?;

let status_code = deno_task_shell::execute(
script,
command_env.clone(),
cwd,
Default::default(),
Default::default(),
)
.await;
let local = LocalSet::new();

// Create a KillSignal for managing child processes
let kill_signal = KillSignal::default();

let kill_signal_clone = kill_signal.clone();
local.spawn_local(async move {
listen_and_forward_all_signals(kill_signal_clone).await;
});

let command_env = command_env.clone();

let status_code = local
.run_until(async move {
deno_task_shell::execute(
script,
command_env.clone(),
cwd,
Default::default(),
kill_signal,
)
.await
})
.await;

if status_code != 0 {
return Err(TaskExecutionError::NonZeroExitCode(status_code));
Expand Down Expand Up @@ -431,25 +513,25 @@ fn disambiguate_task_interactive<'p>(
.map(|idx| problem.environments[idx].clone())
}

/// `dialoguer` doesn't clean up your term if it's aborted via e.g. `SIGINT` or
/// other exceptions: https://github.com/console-rs/dialoguer/issues/188.
///
/// `dialoguer`, as a library, doesn't want to mess with signal handlers,
/// but we, as an application, are free to mess with signal handlers if we feel
/// like it, since we own the process.
/// This function was taken from https://github.com/dnjstrom/git-select-branch/blob/16c454624354040bc32d7943b9cb2e715a5dab92/src/main.rs#L119
fn reset_cursor() {
let term = console::Term::stdout();
let _ = term.show_cursor();
}

/// Exit the process with the appropriate exit code for a SIGINT.
fn exit_process_on_sigint() {
// https://learn.microsoft.com/en-us/cpp/c-runtime-library/signal-constants
#[cfg(target_os = "windows")]
std::process::exit(3);

// POSIX compliant OSs: 128 + SIGINT (2)
#[cfg(not(target_os = "windows"))]
std::process::exit(130);
}
// / `dialoguer` doesn't clean up your term if it's aborted via e.g. `SIGINT` or
// / other exceptions: https://github.com/console-rs/dialoguer/issues/188.
// /
// / `dialoguer`, as a library, doesn't want to mess with signal handlers,
// / but we, as an application, are free to mess with signal handlers if we feel
// / like it, since we own the process.
// / This function was taken from https://github.com/dnjstrom/git-select-branch/blob/16c454624354040bc32d7943b9cb2e715a5dab92/src/main.rs#L119
// fn reset_cursor() {
// let term = console::Term::stdout();
// let _ = term.show_cursor();
// }
Comment on lines +523 to +526
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt this still an issue?


// /// Exit the process with the appropriate exit code for a SIGINT.
// fn exit_process_on_sigint() {
// // https://learn.microsoft.com/en-us/cpp/c-runtime-library/signal-constants
// #[cfg(target_os = "windows")]
// std::process::exit(3);

// // POSIX compliant OSs: 128 + SIGINT (2)
// #[cfg(not(target_os = "windows"))]
// std::process::exit(130);
// }
Loading
Loading