diff --git a/e2e/tasks/test_task_sequence_jobs1 b/e2e/tasks/test_task_sequence_jobs1 new file mode 100644 index 0000000000..77cb6bece7 --- /dev/null +++ b/e2e/tasks/test_task_sequence_jobs1 @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Verify that running a task sequence with --jobs=1 executes tasks and exits + +cat <mise.toml +[tasks.a] +run = "echo A" + +[tasks.b] +run = "echo B" + +[tasks.sequence] +run = [ + { task = "a" }, + { task = "b" }, +] +EOF + +output=$(mise run --jobs=1 sequence 2>&1) && exit_code=0 || exit_code=$? + +if [ "$exit_code" -ne 0 ]; then + echo "Expected exit code 0, got $exit_code" + echo "$output" + exit 1 +fi + +if ! echo "$output" | grep -q "A"; then + echo "Did not find output from task a" + echo "$output" + exit 1 +fi + +if ! echo "$output" | grep -q "B"; then + echo "Did not find output from task b" + echo "$output" + exit 1 +fi + +echo "jobs=1 sequence ran successfully" diff --git a/e2e/tasks/test_task_sequence_keep_order b/e2e/tasks/test_task_sequence_keep_order new file mode 100644 index 0000000000..b6f76e408c --- /dev/null +++ b/e2e/tasks/test_task_sequence_keep_order @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Verify that keep-order output mode does not panic and preserves order + +cat <mise.toml +[tasks.a] +run = "printf 'A1\nA2'" + +[tasks.b] +run = "printf 'B1\nB2'" + +[tasks.sequence] +run = [ + { task = "a" }, + { task = "b" }, +] +EOF + +output=$(MISE_TASK_OUTPUT=keep-order mise run sequence 2>&1) && exit_code=0 || exit_code=$? + +if [ "$exit_code" -ne 0 ]; then + echo "Expected exit code 0, got $exit_code" + echo "$output" + exit 1 +fi + +# Ensure there is no panic message +if echo "$output" | grep -qi "panicked"; then + echo "Found panic in output" + echo "$output" + exit 1 +fi + +# Verify order: all A lines before B lines +apos=$(echo "$output" | grep -n "^\\[a\\] A1$" | cut -d: -f1 | head -n1 || true) +if [ -z "$apos" ]; then + echo "Did not find [a] A1" + echo "$output" + exit 1 +fi + +bpos=$(echo "$output" | grep -n "^\\[b\\] B1$" | cut -d: -f1 | head -n1 || true) +if [ -z "$bpos" ]; then + echo "Did not find [b] B1" + echo "$output" + exit 1 +fi + +if [ "$apos" -ge "$bpos" ]; then + echo "Output for task a did not appear before task b" + echo "$output" + exit 1 +fi + +echo "keep-order sequence ran successfully" diff --git a/src/cli/run.rs b/src/cli/run.rs index bf75a9f89e..4060faeb9e 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -212,6 +212,14 @@ pub struct Run { type KeepOrderOutputs = (Vec<(String, String)>, Vec<(String, String)>); +struct SpawnCtx { + semaphore: Arc, + config: Arc, + sched_tx: Arc>)>>, + jset: Arc>>>, + in_flight: Arc, +} + impl Run { pub async fn run(mut self) -> Result<()> { let config = Config::get().await?; @@ -413,58 +421,19 @@ impl Run { if this.is_stopping() && !this.continue_on_error { break; } - let jset = jset.clone(); - let this_ = this.clone(); - let wait_start = std::time::Instant::now(); - let permit = semaphore.clone().acquire_owned().await?; - trace!( - "semaphore acquired for {} after {}ms", - task.name, - wait_start.elapsed().as_millis() - ); - let config = config.clone(); - let sched_tx = sched_tx.clone(); - in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let in_flight_c = in_flight.clone(); - trace!("running task: {task}"); - jset.lock().await.spawn(async move { - let _permit = permit; - let result = - this_.run_task_sched(&task, &config, sched_tx.clone()).await; - if let Err(err) = &result { - let status = Error::get_exit_status(err); - if !this_.is_stopping() && status.is_none() { - let prefix = task.estyled_prefix(); - if Settings::get().verbose { - this_.eprint( - &task, - &prefix, - &format!("{} {err:?}", style::ered("ERROR")), - ); - } else { - this_.eprint( - &task, - &prefix, - &format!("{} {err}", style::ered("ERROR")), - ); - let mut current_err = err.source(); - while let Some(e) = current_err { - this_.eprint( - &task, - &prefix, - &format!("{} {e}", style::ered("ERROR")), - ); - current_err = e.source(); - } - }; - } - this_.add_failed_task(task.clone(), status); - } - deps_for_remove.lock().await.remove(&task); - trace!("deps removed: {} {}", task.name, task.args.join(" ")); - in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - result - }); + Self::spawn_sched_job( + this.clone(), + task, + deps_for_remove, + SpawnCtx { + semaphore: semaphore.clone(), + config: config.clone(), + sched_tx: sched_tx.clone(), + jset: jset.clone(), + in_flight: in_flight.clone(), + }, + ) + .await?; } Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, @@ -486,45 +455,19 @@ impl Run { if let Some((task, deps_for_remove)) = m { trace!("scheduler received: {} {}", task.name, task.args.join(" ")); if this.is_stopping() && !this.continue_on_error { break; } - let jset = jset.clone(); - let this_ = this.clone(); - let wait_start = std::time::Instant::now(); - let permit = semaphore.clone().acquire_owned().await?; - trace!( - "semaphore acquired for {} after {}ms", - task.name, - wait_start.elapsed().as_millis() - ); - let config = config.clone(); - let sched_tx = sched_tx.clone(); - in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let in_flight_c = in_flight.clone(); - trace!("running task: {task}"); - jset.lock().await.spawn(async move { - let _permit = permit; - let result = this_.run_task_sched(&task, &config, sched_tx.clone()).await; - if let Err(err) = &result { - let status = Error::get_exit_status(err); - if !this_.is_stopping() && status.is_none() { - let prefix = task.estyled_prefix(); - if Settings::get().verbose { - this_.eprint(&task, &prefix, &format!("{} {err:?}", style::ered("ERROR"))); - } else { - this_.eprint(&task, &prefix, &format!("{} {err}", style::ered("ERROR"))); - let mut current_err = err.source(); - while let Some(e) = current_err { - this_.eprint(&task, &prefix, &format!("{} {e}", style::ered("ERROR"))); - current_err = e.source(); - } - }; - } - this_.add_failed_task(task.clone(), status); - } - deps_for_remove.lock().await.remove(&task); - trace!("deps removed: {} {}", task.name, task.args.join(" ")); - in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - result - }); + Self::spawn_sched_job( + this.clone(), + task, + deps_for_remove, + SpawnCtx { + semaphore: semaphore.clone(), + config: config.clone(), + sched_tx: sched_tx.clone(), + jset: jset.clone(), + in_flight: in_flight.clone(), + }, + ) + .await?; } else { // channel closed; rely on main_done/in_flight to exit soon } @@ -587,6 +530,68 @@ impl Run { Ok(()) } + async fn spawn_sched_job( + this: Arc, + task: Task, + deps_for_remove: Arc>, + ctx: SpawnCtx, + ) -> Result<()> { + let needs_permit = Self::task_needs_permit(&task); + let permit_opt = if needs_permit { + let wait_start = std::time::Instant::now(); + let p = Some(ctx.semaphore.clone().acquire_owned().await?); + trace!( + "semaphore acquired for {} after {}ms", + task.name, + wait_start.elapsed().as_millis() + ); + p + } else { + trace!("no semaphore needed for orchestrator task: {}", task.name); + None + }; + + ctx.in_flight + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let in_flight_c = ctx.in_flight.clone(); + trace!("running task: {task}"); + ctx.jset.lock().await.spawn(async move { + let _permit = permit_opt; + let result = this + .run_task_sched(&task, &ctx.config, ctx.sched_tx.clone()) + .await; + if let Err(err) = &result { + let status = Error::get_exit_status(err); + if !this.is_stopping() && status.is_none() { + let prefix = task.estyled_prefix(); + if Settings::get().verbose { + this.eprint(&task, &prefix, &format!("{} {err:?}", style::ered("ERROR"))); + } else { + this.eprint(&task, &prefix, &format!("{} {err}", style::ered("ERROR"))); + let mut current_err = err.source(); + while let Some(e) = current_err { + this.eprint(&task, &prefix, &format!("{} {e}", style::ered("ERROR"))); + current_err = e.source(); + } + }; + } + this.add_failed_task(task.clone(), status); + } + deps_for_remove.lock().await.remove(&task); + trace!("deps removed: {} {}", task.name, task.args.join(" ")); + in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + result + }); + + Ok(()) + } + + fn task_needs_permit(task: &Task) -> bool { + // Only shell/script tasks execute external commands and need a concurrency slot. + // Orchestrator-only tasks (pure groups of sub-tasks) do not. + task.file.is_some() || !task.run_script_strings().is_empty() + } + fn maybe_print_failure_summary(&self) { if !self.continue_on_error { return; @@ -1032,22 +1037,22 @@ impl Run { } TaskOutput::KeepOrder => { cmd = cmd.with_on_stdout(|line| { - self.keep_order_output - .lock() - .unwrap() - .get_mut(task) - .unwrap() - .0 - .push((prefix.to_string(), line)); + let mut map = self.keep_order_output.lock().unwrap(); + if !map.contains_key(task) { + map.insert(task.clone(), Default::default()); + } + if let Some(entry) = map.get_mut(task) { + entry.0.push((prefix.to_string(), line)); + } }); cmd = cmd.with_on_stderr(|line| { - self.keep_order_output - .lock() - .unwrap() - .get_mut(task) - .unwrap() - .1 - .push((prefix.to_string(), line)); + let mut map = self.keep_order_output.lock().unwrap(); + if !map.contains_key(task) { + map.insert(task.clone(), Default::default()); + } + if let Some(entry) = map.get_mut(task) { + entry.1.push((prefix.to_string(), line)); + } }); } TaskOutput::Replacing => { diff --git a/src/ui/ctrlc.rs b/src/ui/ctrlc.rs index 82c215489d..76843c5fbc 100644 --- a/src/ui/ctrlc.rs +++ b/src/ui/ctrlc.rs @@ -10,19 +10,21 @@ static SHOW_CURSOR: AtomicBool = AtomicBool::new(false); pub fn init() { tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - if SHOW_CURSOR.load(Ordering::Relaxed) { - let _ = Term::stderr().show_cursor(); - } - CmdLineRunner::kill_all(nix::sys::signal::SIGINT); - if EXIT.swap(true, Ordering::Relaxed) { - debug!("Ctrl-C pressed, exiting..."); - exit(1); - } else { - eprintln!(); - warn!( - "Ctrl-C pressed, please wait for tasks to finish or press Ctrl-C again to force exit" - ); + loop { + tokio::signal::ctrl_c().await.unwrap(); + if SHOW_CURSOR.load(Ordering::Relaxed) { + let _ = Term::stderr().show_cursor(); + } + CmdLineRunner::kill_all(nix::sys::signal::SIGINT); + if EXIT.swap(true, Ordering::Relaxed) { + debug!("Ctrl-C pressed, exiting..."); + exit(1); + } else { + eprintln!(); + warn!( + "Ctrl-C pressed, please wait for tasks to finish or press Ctrl-C again to force exit" + ); + } } }); }