-
-
Notifications
You must be signed in to change notification settings - Fork 961
fix(task): resolve jobs=1 hang and keep-order panic; improve Ctrl-C handling #6264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,40 @@ | ||
| #!/usr/bin/env bash | ||
| set -euo pipefail | ||
|
|
||
| # Verify that running a task sequence with --jobs=1 executes tasks and exits | ||
|
|
||
| cat <<EOF >mise.toml | ||
| [tasks.a] | ||
| run = "echo A" | ||
|
|
||
| [tasks.b] | ||
| run = "echo B" | ||
|
|
||
| [tasks.sequence] | ||
| run = [ | ||
| { task = "a" }, | ||
| { task = "b" }, | ||
| ] | ||
| EOF | ||
|
|
||
| output=$(mise run --jobs=1 sequence 2>&1) && exit_code=0 || exit_code=$? | ||
|
|
||
| if [ "$exit_code" -ne 0 ]; then | ||
| echo "Expected exit code 0, got $exit_code" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| if ! echo "$output" | grep -q "A"; then | ||
| echo "Did not find output from task a" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| if ! echo "$output" | grep -q "B"; then | ||
| echo "Did not find output from task b" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| echo "jobs=1 sequence ran successfully" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| #!/usr/bin/env bash | ||
| set -euo pipefail | ||
|
|
||
| # Verify that keep-order output mode does not panic and preserves order | ||
|
|
||
| cat <<EOF >mise.toml | ||
| [tasks.a] | ||
| run = "printf 'A1\nA2'" | ||
|
|
||
| [tasks.b] | ||
| run = "printf 'B1\nB2'" | ||
|
|
||
| [tasks.sequence] | ||
| run = [ | ||
| { task = "a" }, | ||
| { task = "b" }, | ||
| ] | ||
| EOF | ||
|
|
||
| output=$(MISE_TASK_OUTPUT=keep-order mise run sequence 2>&1) && exit_code=0 || exit_code=$? | ||
|
|
||
| if [ "$exit_code" -ne 0 ]; then | ||
| echo "Expected exit code 0, got $exit_code" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Ensure there is no panic message | ||
| if echo "$output" | grep -qi "panicked"; then | ||
| echo "Found panic in output" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| # Verify order: all A lines before B lines | ||
| apos=$(echo "$output" | grep -n "^\\[a\\] A1$" | cut -d: -f1 | head -n1 || true) | ||
| if [ -z "$apos" ]; then | ||
| echo "Did not find [a] A1" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| bpos=$(echo "$output" | grep -n "^\\[b\\] B1$" | cut -d: -f1 | head -n1 || true) | ||
| if [ -z "$bpos" ]; then | ||
| echo "Did not find [b] B1" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| if [ "$apos" -ge "$bpos" ]; then | ||
| echo "Output for task a did not appear before task b" | ||
| echo "$output" | ||
| exit 1 | ||
| fi | ||
|
|
||
| echo "keep-order sequence ran successfully" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -212,6 +212,14 @@ pub struct Run { | |
|
|
||
| type KeepOrderOutputs = (Vec<(String, String)>, Vec<(String, String)>); | ||
|
|
||
| struct SpawnCtx { | ||
| semaphore: Arc<Semaphore>, | ||
| config: Arc<Config>, | ||
| sched_tx: Arc<mpsc::UnboundedSender<(Task, Arc<Mutex<Deps>>)>>, | ||
| jset: Arc<Mutex<JoinSet<Result<()>>>>, | ||
| in_flight: Arc<std::sync::atomic::AtomicUsize>, | ||
| } | ||
|
|
||
| impl Run { | ||
| pub async fn run(mut self) -> Result<()> { | ||
| let config = Config::get().await?; | ||
|
|
@@ -413,58 +421,19 @@ impl Run { | |
| if this.is_stopping() && !this.continue_on_error { | ||
| break; | ||
| } | ||
| let jset = jset.clone(); | ||
| let this_ = this.clone(); | ||
| let wait_start = std::time::Instant::now(); | ||
| let permit = semaphore.clone().acquire_owned().await?; | ||
| trace!( | ||
| "semaphore acquired for {} after {}ms", | ||
| task.name, | ||
| wait_start.elapsed().as_millis() | ||
| ); | ||
| let config = config.clone(); | ||
| let sched_tx = sched_tx.clone(); | ||
| in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
| let in_flight_c = in_flight.clone(); | ||
| trace!("running task: {task}"); | ||
| jset.lock().await.spawn(async move { | ||
| let _permit = permit; | ||
| let result = | ||
| this_.run_task_sched(&task, &config, sched_tx.clone()).await; | ||
| if let Err(err) = &result { | ||
| let status = Error::get_exit_status(err); | ||
| if !this_.is_stopping() && status.is_none() { | ||
| let prefix = task.estyled_prefix(); | ||
| if Settings::get().verbose { | ||
| this_.eprint( | ||
| &task, | ||
| &prefix, | ||
| &format!("{} {err:?}", style::ered("ERROR")), | ||
| ); | ||
| } else { | ||
| this_.eprint( | ||
| &task, | ||
| &prefix, | ||
| &format!("{} {err}", style::ered("ERROR")), | ||
| ); | ||
| let mut current_err = err.source(); | ||
| while let Some(e) = current_err { | ||
| this_.eprint( | ||
| &task, | ||
| &prefix, | ||
| &format!("{} {e}", style::ered("ERROR")), | ||
| ); | ||
| current_err = e.source(); | ||
| } | ||
| }; | ||
| } | ||
| this_.add_failed_task(task.clone(), status); | ||
| } | ||
| deps_for_remove.lock().await.remove(&task); | ||
| trace!("deps removed: {} {}", task.name, task.args.join(" ")); | ||
| in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); | ||
| result | ||
| }); | ||
| Self::spawn_sched_job( | ||
| this.clone(), | ||
| task, | ||
| deps_for_remove, | ||
| SpawnCtx { | ||
| semaphore: semaphore.clone(), | ||
| config: config.clone(), | ||
| sched_tx: sched_tx.clone(), | ||
| jset: jset.clone(), | ||
| in_flight: in_flight.clone(), | ||
| }, | ||
| ) | ||
| .await?; | ||
| } | ||
| Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, | ||
| Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, | ||
|
|
@@ -486,45 +455,19 @@ impl Run { | |
| if let Some((task, deps_for_remove)) = m { | ||
| trace!("scheduler received: {} {}", task.name, task.args.join(" ")); | ||
| if this.is_stopping() && !this.continue_on_error { break; } | ||
| let jset = jset.clone(); | ||
| let this_ = this.clone(); | ||
| let wait_start = std::time::Instant::now(); | ||
| let permit = semaphore.clone().acquire_owned().await?; | ||
| trace!( | ||
| "semaphore acquired for {} after {}ms", | ||
| task.name, | ||
| wait_start.elapsed().as_millis() | ||
| ); | ||
| let config = config.clone(); | ||
| let sched_tx = sched_tx.clone(); | ||
| in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
| let in_flight_c = in_flight.clone(); | ||
| trace!("running task: {task}"); | ||
| jset.lock().await.spawn(async move { | ||
| let _permit = permit; | ||
| let result = this_.run_task_sched(&task, &config, sched_tx.clone()).await; | ||
| if let Err(err) = &result { | ||
| let status = Error::get_exit_status(err); | ||
| if !this_.is_stopping() && status.is_none() { | ||
| let prefix = task.estyled_prefix(); | ||
| if Settings::get().verbose { | ||
| this_.eprint(&task, &prefix, &format!("{} {err:?}", style::ered("ERROR"))); | ||
| } else { | ||
| this_.eprint(&task, &prefix, &format!("{} {err}", style::ered("ERROR"))); | ||
| let mut current_err = err.source(); | ||
| while let Some(e) = current_err { | ||
| this_.eprint(&task, &prefix, &format!("{} {e}", style::ered("ERROR"))); | ||
| current_err = e.source(); | ||
| } | ||
| }; | ||
| } | ||
| this_.add_failed_task(task.clone(), status); | ||
| } | ||
| deps_for_remove.lock().await.remove(&task); | ||
| trace!("deps removed: {} {}", task.name, task.args.join(" ")); | ||
| in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); | ||
| result | ||
| }); | ||
| Self::spawn_sched_job( | ||
| this.clone(), | ||
| task, | ||
| deps_for_remove, | ||
| SpawnCtx { | ||
| semaphore: semaphore.clone(), | ||
| config: config.clone(), | ||
| sched_tx: sched_tx.clone(), | ||
| jset: jset.clone(), | ||
| in_flight: in_flight.clone(), | ||
| }, | ||
| ) | ||
| .await?; | ||
| } else { | ||
| // channel closed; rely on main_done/in_flight to exit soon | ||
| } | ||
|
|
@@ -587,6 +530,68 @@ impl Run { | |
| Ok(()) | ||
| } | ||
|
|
||
| async fn spawn_sched_job( | ||
| this: Arc<Self>, | ||
| task: Task, | ||
| deps_for_remove: Arc<Mutex<Deps>>, | ||
| ctx: SpawnCtx, | ||
| ) -> Result<()> { | ||
| let needs_permit = Self::task_needs_permit(&task); | ||
| let permit_opt = if needs_permit { | ||
| let wait_start = std::time::Instant::now(); | ||
| let p = Some(ctx.semaphore.clone().acquire_owned().await?); | ||
| trace!( | ||
| "semaphore acquired for {} after {}ms", | ||
| task.name, | ||
| wait_start.elapsed().as_millis() | ||
| ); | ||
| p | ||
| } else { | ||
| trace!("no semaphore needed for orchestrator task: {}", task.name); | ||
| None | ||
| }; | ||
|
|
||
| ctx.in_flight | ||
| .fetch_add(1, std::sync::atomic::Ordering::SeqCst); | ||
| let in_flight_c = ctx.in_flight.clone(); | ||
| trace!("running task: {task}"); | ||
| ctx.jset.lock().await.spawn(async move { | ||
| let _permit = permit_opt; | ||
| let result = this | ||
| .run_task_sched(&task, &ctx.config, ctx.sched_tx.clone()) | ||
| .await; | ||
| if let Err(err) = &result { | ||
| let status = Error::get_exit_status(err); | ||
| if !this.is_stopping() && status.is_none() { | ||
| let prefix = task.estyled_prefix(); | ||
| if Settings::get().verbose { | ||
| this.eprint(&task, &prefix, &format!("{} {err:?}", style::ered("ERROR"))); | ||
| } else { | ||
| this.eprint(&task, &prefix, &format!("{} {err}", style::ered("ERROR"))); | ||
| let mut current_err = err.source(); | ||
| while let Some(e) = current_err { | ||
| this.eprint(&task, &prefix, &format!("{} {e}", style::ered("ERROR"))); | ||
| current_err = e.source(); | ||
| } | ||
| }; | ||
| } | ||
| this.add_failed_task(task.clone(), status); | ||
| } | ||
| deps_for_remove.lock().await.remove(&task); | ||
| trace!("deps removed: {} {}", task.name, task.args.join(" ")); | ||
| in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); | ||
| result | ||
| }); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn task_needs_permit(task: &Task) -> bool { | ||
| // Only shell/script tasks execute external commands and need a concurrency slot. | ||
| // Orchestrator-only tasks (pure groups of sub-tasks) do not. | ||
| task.file.is_some() || !task.run_script_strings().is_empty() | ||
| } | ||
|
|
||
| fn maybe_print_failure_summary(&self) { | ||
| if !self.continue_on_error { | ||
| return; | ||
|
|
@@ -1032,22 +1037,22 @@ impl Run { | |
| } | ||
| TaskOutput::KeepOrder => { | ||
| cmd = cmd.with_on_stdout(|line| { | ||
| self.keep_order_output | ||
| .lock() | ||
| .unwrap() | ||
| .get_mut(task) | ||
| .unwrap() | ||
| .0 | ||
| .push((prefix.to_string(), line)); | ||
| let mut map = self.keep_order_output.lock().unwrap(); | ||
| if !map.contains_key(task) { | ||
| map.insert(task.clone(), Default::default()); | ||
| } | ||
| if let Some(entry) = map.get_mut(task) { | ||
| entry.0.push((prefix.to_string(), line)); | ||
| } | ||
| }); | ||
| cmd = cmd.with_on_stderr(|line| { | ||
| self.keep_order_output | ||
| .lock() | ||
| .unwrap() | ||
| .get_mut(task) | ||
| .unwrap() | ||
| .1 | ||
| .push((prefix.to_string(), line)); | ||
| let mut map = self.keep_order_output.lock().unwrap(); | ||
| if !map.contains_key(task) { | ||
| map.insert(task.clone(), Default::default()); | ||
| } | ||
| if let Some(entry) = map.get_mut(task) { | ||
| entry.1.push((prefix.to_string(), line)); | ||
| } | ||
|
Comment on lines
+1041
to
+1055
|
||
| }); | ||
| } | ||
| TaskOutput::Replacing => { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider using a more explicit boolean expression instead of relying on double negation. The logic would be clearer as
task.file.is_some() || task.run_script_strings().is_empty() == falseor restructuring to avoid the negation entirely.