From b793988afabcb34bc7d51271225043011186ed91 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sat, 6 Sep 2025 14:05:35 -0500 Subject: [PATCH 1/6] feat(task): sub-tasks in run lists --- e2e/tasks/test_task_run_groups | 30 ++++++++ src/cli/mcp.rs | 2 +- src/cli/run.rs | 101 ++++++++++++++++++++++++-- src/cli/tasks/info.rs | 5 +- src/cli/tasks/ls.rs | 2 +- src/config/config_file/mise_toml.rs | 4 +- src/task/deps.rs | 58 ++++++++++++++- src/task/mod.rs | 105 ++++++++++++++++++++++++---- 8 files changed, 280 insertions(+), 27 deletions(-) create mode 100644 e2e/tasks/test_task_run_groups diff --git a/e2e/tasks/test_task_run_groups b/e2e/tasks/test_task_run_groups new file mode 100644 index 0000000000..fed322c9e4 --- /dev/null +++ b/e2e/tasks/test_task_run_groups @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +cat <mise.toml +[tasks.t1] +run = 'echo one' + +[tasks.t2] +run = 'echo two' + +[tasks.t3] +run = 'echo three' + +[tasks.grouped] +run = [ + { task = 't1' }, + { tasks = ['t2','t3'] }, + 'echo end', +] +EOF + +# All outputs should appear +assert_contains "mise run grouped" "one" +assert_contains "mise run grouped" "two" +assert_contains "mise run grouped" "three" +assert_contains "mise run grouped" "end" + +# Ensure trailing script runs after grouped tasks complete +assert_matches "mise run grouped" "one(.|\n)*end" +assert_matches "mise run grouped" "two(.|\n)*end" +assert_matches "mise run grouped" "three(.|\n)*end" diff --git a/src/cli/mcp.rs b/src/cli/mcp.rs index 4fe1ab722f..7ca7d4e6ec 100644 --- a/src/cli/mcp.rs +++ b/src/cli/mcp.rs @@ -209,7 +209,7 @@ impl ServerHandler for MiseServer { "quiet": task.quiet, "silent": task.silent, "tools": task.tools.clone(), - "run": task.run.clone(), + "run": task.run_script_strings(), "usage": task.usage.clone(), }) }).collect(); diff --git a/src/cli/run.rs b/src/cli/run.rs index 2e920eece2..901c8f704e 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -337,7 +337,7 @@ impl Run { trace!("running task: {task}"); jset.lock().await.spawn(async move { let _permit = permit; - let result = this_.run_task(&task, &config).await; + let result = this_.run_task_with_inject(&task, &config, tasks.clone()).await; if let Err(err) = &result { let status = Error::get_exit_status(err); if !this_.is_stopping() && status.is_none() { @@ -513,13 +513,13 @@ impl Run { self.parse_usage_spec_and_init_env(config, task, &mut env, get_args) .await?; - for (script, args) in rendered_run_scripts { - self.exec_script(&script, &args, task, &env, &prefix) - .await?; - } + self.exec_task_run_entries(config, task, &env, &prefix, rendered_run_scripts) + .await?; } - if self.task_timings() && (task.file.as_ref().is_some() || !task.run().is_empty()) { + if self.task_timings() + && (task.file.as_ref().is_some() || !task.run_script_strings().is_empty()) + { self.eprint( task, &prefix, @@ -532,6 +532,88 @@ impl Run { Ok(()) } + async fn exec_task_run_entries( + &self, + config: &Arc, + task: &Task, + env: &BTreeMap, + prefix: &str, + rendered_scripts: Vec<(String, Vec)>, + ) -> Result<()> { + use crate::task::RunEntry; + let mut script_iter = rendered_scripts.into_iter(); + for entry in task.run() { + match entry { + RunEntry::Script(_) => { + if let Some((script, args)) = script_iter.next() { + self.exec_script(&script, &args, task, env, prefix).await?; + } + } + RunEntry::SingleTask { task: spec } => { + self.exec_single_task_spec(config, spec).await?; + } + RunEntry::TaskGroup { tasks } => { + self.exec_task_group_specs(config, tasks) + .await?; + } + } + } + Ok(()) + } + + async fn exec_single_task_spec( + &self, + config: &Arc, + spec: &str, + ) -> Result<()> { + let mise_bin = env::MISE_BIN.display().to_string(); + #[cfg(unix)] + let cmd = format!("{} run {}", shell_words::quote(&mise_bin), spec); + #[cfg(windows)] + let cmd = format!("{} run {}", mise_bin, spec); + // Execute as a shell script so output/prefix handling remains consistent + let args: Vec = vec![]; + // Build a fake task prefix using the parent task's, we already have it from call site + // but exec_script requires a real Task; it's fine to reuse current task + // We need env/prefix from outer call so this function is only used inside exec_task_run_entries + // Therefore, just return the command string to caller in future refactors if needed + // For now just spawn with current settings + let dummy_task = Task::default(); + let prefix = dummy_task.estyled_prefix(); + let env_map: BTreeMap = Default::default(); + self.exec_script(&cmd, &args, &dummy_task, &env_map, &prefix).await + } + + async fn exec_task_group_specs( + &self, + config: &Arc, + specs: &[String], + ) -> Result<()> { + let mise_bin = env::MISE_BIN.display().to_string(); + #[cfg(unix)] + let cmd = { + let parts = specs + .iter() + .map(|s| format!("({} run {}) &", shell_words::quote(&mise_bin), s)) + .collect::(); + format!("{} wait", parts) + }; + #[cfg(windows)] + let cmd = { + // Fallback to sequential on Windows + specs + .iter() + .map(|s| format!("{} run {}", mise_bin, s)) + .collect::>() + .join(" & ") + }; + let args: Vec = vec![]; + let dummy_task = Task::default(); + let prefix = dummy_task.estyled_prefix(); + let env_map: BTreeMap = Default::default(); + self.exec_script(&cmd, &args, &dummy_task, &env_map, &prefix).await + } + async fn exec_script( &self, script: &str, @@ -1072,6 +1154,13 @@ impl Run { } } +fn split_task_spec(spec: &str) -> (&str, Vec) { + let mut parts = spec.split_whitespace(); + let name = parts.next().unwrap_or(""); + let args = parts.map(|s| s.to_string()).collect_vec(); + (name, args) +} + fn is_glob_pattern(path: &str) -> bool { // This is the character set used for glob // detection by glob diff --git a/src/cli/tasks/info.rs b/src/cli/tasks/info.rs index 36ca817f29..4fa6b1b53a 100644 --- a/src/cli/tasks/info.rs +++ b/src/cli/tasks/info.rs @@ -82,8 +82,9 @@ impl TasksInfo { if let Some(file) = &task.file { info::inline_section("File", display_path(file))?; } - if !task.run().is_empty() { - info::section("Run", task.run().join("\n"))?; + let run_scripts = task.run_script_strings(); + if !run_scripts.is_empty() { + info::section("Run", run_scripts.join("\n"))?; } if !task.env.is_empty() { let env_display = task diff --git a/src/cli/tasks/ls.rs b/src/cli/tasks/ls.rs index 5e28d38953..87f9208cea 100644 --- a/src/cli/tasks/ls.rs +++ b/src/cli/tasks/ls.rs @@ -173,7 +173,7 @@ impl TasksLs { "quiet": task.quiet, "silent": task.silent, "tools": task.tools, - "run": task.run(), + "run": task.run_script_strings(), "file": task.file, }) }) diff --git a/src/config/config_file/mise_toml.rs b/src/config/config_file/mise_toml.rs index 47690c610f..3b3d2fc9b8 100644 --- a/src/config/config_file/mise_toml.rs +++ b/src/config/config_file/mise_toml.rs @@ -1425,7 +1425,7 @@ impl<'de> de::Deserialize<'de> for Tasks { E: de::Error, { Ok(TaskDef(Task { - run: vec![v.to_string()], + run: vec![crate::task::RunEntry::Script(v.to_string())], ..Default::default() })) } @@ -1435,7 +1435,7 @@ impl<'de> de::Deserialize<'de> for Tasks { S: de::SeqAccess<'de>, { let mut run = vec![]; - while let Some(s) = seq.next_element::()? { + while let Some(s) = seq.next_element::()? { run.push(s); } Ok(TaskDef(Task { diff --git a/src/task/deps.rs b/src/task/deps.rs index 7c73ceec4e..72cca72947 100644 --- a/src/task/deps.rs +++ b/src/task/deps.rs @@ -8,6 +8,7 @@ use std::{ sync::Arc, }; use tokio::sync::mpsc; +use tokio::sync::Notify; #[derive(Debug, Clone)] pub struct Deps { @@ -15,9 +16,10 @@ pub struct Deps { sent: HashSet<(String, Vec)>, // tasks+args that have already started so should not run again removed: HashSet<(String, Vec)>, // tasks+args that have already finished to track if we are in an infinitve loop tx: mpsc::UnboundedSender>, + pub completed: Notify, } -fn task_key(task: &Task) -> (String, Vec) { +pub fn task_key(task: &Task) -> (String, Vec) { (task.name.clone(), task.args.clone()) } @@ -69,6 +71,7 @@ impl Deps { tx, sent, removed, + completed: Notify::new(), }) } @@ -123,6 +126,7 @@ impl Deps { let key = (task.name.clone(), task.args.clone()); self.removed.insert(key); self.emit_leaves(); + self.completed.notify_waiters(); } } @@ -155,6 +159,58 @@ impl Deps { } } } + + /// Add new tasks (and their dependencies) to the graph and emit any new leaves + pub async fn add(&mut self, config: &Arc, tasks: Vec) -> eyre::Result<()> { + // Build a dependency subgraph for the incoming tasks similar to new() + let mut indexes = HashMap::new(); + let mut stack: Vec = vec![]; + let mut seen: HashSet = HashSet::new(); + + let mut add_idx = |task: &Task, graph: &mut DiGraph| { + *indexes + .entry(task_key(task)) + .or_insert_with(|| graph.add_node(task.clone())) + }; + + for t in &tasks { + // Skip if task already exists in graph + if self.node_idx(t).is_none() { + stack.push(t.clone()); + add_idx(t, &mut self.graph); + } + } + + let all_tasks_to_run = resolve_depends(config, tasks).await?; + while let Some(a) = stack.pop() { + if seen.contains(&a) { + continue; + } + let a_idx = add_idx(&a, &mut self.graph); + let (pre, post) = a.resolve_depends(config, &all_tasks_to_run).await?; + for b in pre { + let b_idx = add_idx(&b, &mut self.graph); + self.graph.update_edge(a_idx, b_idx, ()); + if self.node_idx(&b).is_none() { + stack.push(b.clone()); + } + } + for b in post { + let b_idx = add_idx(&b, &mut self.graph); + self.graph.update_edge(b_idx, a_idx, ()); + if self.node_idx(&b).is_none() { + stack.push(b.clone()); + } + } + seen.insert(a); + } + self.emit_leaves(); + Ok(()) + } + + pub fn is_removed_key(&self, key: &(String, Vec)) -> bool { + self.removed.contains(key) + } } fn leaves(graph: &DiGraph) -> Vec { diff --git a/src/task/mod.rs b/src/task/mod.rs index 48eece1d83..1634ae5816 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -16,6 +16,7 @@ use indexmap::IndexMap; use itertools::Itertools; use petgraph::prelude::*; use serde_derive::{Deserialize, Serialize}; +use serde::de; use std::borrow::Cow; use std::cmp::Ordering; use std::collections::BTreeMap; @@ -43,6 +44,24 @@ pub use deps::Deps; use task_dep::TaskDep; use task_sources::TaskOutputs; +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[serde(untagged)] +pub enum RunEntry { + /// Shell script entry + Script(String), + /// Run a single task with optional args + SingleTask { task: String }, + /// Run multiple tasks in parallel + TaskGroup { tasks: Vec }, +} + +impl std::str::FromStr for RunEntry { + type Err = String; + fn from_str(s: &str) -> Result { + Ok(RunEntry::Script(s.to_string())) + } +} + #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct Task { @@ -94,11 +113,11 @@ pub struct Task { pub usage: String, // normal type - #[serde(default, deserialize_with = "deserialize_arr")] - pub run: Vec, + #[serde(default, deserialize_with = "deserialize_run_entries")] + pub run: Vec, - #[serde(default, deserialize_with = "deserialize_arr")] - pub run_windows: Vec, + #[serde(default, deserialize_with = "deserialize_run_entries")] + pub run_windows: Vec, // command type // pub command: Option, @@ -281,7 +300,7 @@ impl Task { format!("[{}]", self.display_name) } - pub fn run(&self) -> &Vec { + pub fn run(&self) -> &Vec { if cfg!(windows) && !self.run_windows.is_empty() { &self.run_windows } else { @@ -289,6 +308,17 @@ impl Task { } } + /// Returns only the script strings from the run entries (without rendering) + pub fn run_script_strings(&self) -> Vec { + self.run() + .iter() + .filter_map(|e| match e { + RunEntry::Script(s) => Some(s.clone()), + _ => None, + }) + .collect() + } + pub fn all_depends(&self, tasks: &BTreeMap) -> Result> { let mut depends: Vec = self .depends @@ -377,8 +407,9 @@ impl Task { .unwrap_or_default(); (spec, vec![]) } else { + let scripts_only = self.run_script_strings(); let (scripts, spec) = TaskScriptParser::new(cwd) - .parse_run_scripts(config, self, self.run(), env) + .parse_run_scripts(config, self, &scripts_only, env) .await?; (spec, scripts) }; @@ -399,8 +430,9 @@ impl Task { }) .unwrap_or_default() } else { + let scripts_only = self.run_script_strings(); TaskScriptParser::new(dir) - .parse_run_scripts_for_spec_only(config, self, self.run()) + .parse_run_scripts_for_spec_only(config, self, &scripts_only) .await? }; self.populate_spec_metadata(&mut spec); @@ -416,8 +448,9 @@ impl Task { ) -> Result)>> { let (spec, scripts) = self.parse_usage_spec(config, cwd.clone(), env).await?; if has_any_args_defined(&spec) { + let scripts_only = self.run_script_strings(); let scripts = TaskScriptParser::new(cwd) - .parse_run_scripts_with_args(config, self, self.run(), env, args, &spec) + .parse_run_scripts_with_args(config, self, &scripts_only, env, args, &spec) .await?; Ok(scripts.into_iter().map(|s| (s, vec![])).collect()) } else { @@ -426,7 +459,7 @@ impl Task { .enumerate() .map(|(i, script)| { // only pass args to the last script if no formal args are defined - match i == self.run().len() - 1 { + match i == self.run_script_strings().len() - 1 { true => (script.clone(), args.iter().cloned().collect_vec()), false => (script.clone(), vec![]), } @@ -683,13 +716,57 @@ impl Default for Task { } } +pub fn deserialize_run_entries<'de, D>(deserializer: D) -> std::result::Result, D::Error> +where + D: de::Deserializer<'de>, +{ + struct RunEntriesVisitor; + impl<'de> de::Visitor<'de> for RunEntriesVisitor { + type Value = Vec; + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("string | object | array of string/object") + } + + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + Ok(vec![RunEntry::Script(v.to_string())]) + } + + fn visit_map(self, map: M) -> std::result::Result + where + M: de::MapAccess<'de>, + { + let entry: RunEntry = de::Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))?; + Ok(vec![entry]) + } + + fn visit_seq(self, mut seq: S) -> std::result::Result + where + S: de::SeqAccess<'de>, + { + let mut v = vec![]; + while let Some(entry) = seq.next_element::()? { + v.push(entry); + } + Ok(v) + } + } + + deserializer.deserialize_any(RunEntriesVisitor) +} + impl Display for Task { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - let cmd = if let Some(command) = self.run().first() { - Some(command.to_string()) - } else { - self.file.as_ref().map(display_path) - }; + let cmd = self + .run() + .iter() + .find_map(|e| match e { + RunEntry::Script(s) => Some(s.clone()), + _ => None, + }) + .or_else(|| self.file.as_ref().map(display_path)); if let Some(cmd) = cmd { let cmd = cmd.lines().next().unwrap_or_default(); From 4e03358e733f29fbf7755284dbd4b3c1bc00fc99 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sat, 6 Sep 2025 14:49:03 -0500 Subject: [PATCH 2/6] fix(cli): prevent deadlock in dynamic task scheduler; add tracing --- src/cli/run.rs | 376 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 269 insertions(+), 107 deletions(-) diff --git a/src/cli/run.rs b/src/cli/run.rs index 901c8f704e..0c171ca926 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -34,8 +34,8 @@ use itertools::Itertools; #[cfg(unix)] use nix::sys::signal::SIGTERM; use tokio::{ - sync::{Mutex, Semaphore}, - task::{JoinHandle, JoinSet}, + sync::{Mutex, Semaphore, mpsc, oneshot}, + task::JoinSet, }; use xx::regex; @@ -316,66 +316,192 @@ impl Run { .await?; let timer = std::time::Instant::now(); - let this_ = this.clone(); let jset = Arc::new(Mutex::new(JoinSet::new())); - let jset_ = jset.clone(); let config = config.clone(); - let handle: JoinHandle> = tokio::task::spawn(async move { - let tasks = Arc::new(Mutex::new(tasks)); - let semaphore = Arc::new(Semaphore::new(this_.jobs())); - let mut rx = tasks.lock().await.subscribe(); - while let Some(Some(task)) = rx.recv().await { - if this_.is_stopping() { - break; + type SchedMsg = (Task, Arc>); + let (sched_tx, mut sched_rx) = mpsc::unbounded_channel::(); + let sched_tx = Arc::new(sched_tx); + let in_flight = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let (main_done_tx, main_done_rx) = tokio::sync::watch::channel(false); + + // Pump initial deps leaves into scheduler + let main_deps = Arc::new(Mutex::new(tasks)); + { + let sched_tx = sched_tx.clone(); + let main_deps_clone = main_deps.clone(); + // forward initial leaves synchronously + { + let mut rx = main_deps_clone.lock().await.subscribe(); + loop { + match rx.try_recv() { + Ok(Some(task)) => { + trace!( + "main deps initial leaf: {} {}", + task.name, + task.args.join(" ") + ); + let _ = sched_tx.send((task, main_deps_clone.clone())); + } + Ok(None) => { + trace!("main deps initial done"); + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break; + } + } } - let jset = jset_.clone(); - let this_ = this_.clone(); - let permit = semaphore.clone().acquire_owned().await?; - let tasks = tasks.clone(); - let config = config.clone(); - trace!("running task: {task}"); - jset.lock().await.spawn(async move { - let _permit = permit; - let result = this_.run_task_with_inject(&task, &config, tasks.clone()).await; - if let Err(err) = &result { - let status = Error::get_exit_status(err); - if !this_.is_stopping() && status.is_none() { - // only show this if it's the first failure, or we haven't killed all the remaining tasks - // otherwise we'll get unhelpful error messages about being killed by mise which we expect - let prefix = task.estyled_prefix(); - if Settings::get().verbose { - this_.eprint( - &task, - &prefix, - &format!("{} {err:?}", style::ered("ERROR")), - ); - } else { - // Show the full error chain - this_.eprint( - &task, - &prefix, - &format!("{} {err}", style::ered("ERROR")), - ); - let mut current_err = err.source(); - while let Some(e) = current_err { - this_.eprint( - &task, - &prefix, - &format!("{} {e}", style::ered("ERROR")), - ); - current_err = e.source(); - } - }; + } + // then forward remaining leaves asynchronously + tokio::spawn(async move { + let mut rx = main_deps_clone.lock().await.subscribe(); + while let Some(msg) = rx.recv().await { + match msg { + Some(task) => { + trace!( + "main deps leaf scheduled: {} {}", + task.name, + task.args.join(" ") + ); + let _ = sched_tx.send((task, main_deps_clone.clone())); + } + None => { + trace!("main deps completed"); + let _ = main_done_tx.send(true); + break; } - this_.add_failed_task(task.clone(), status); } - tasks.lock().await.remove(&task); - result - }); + } + }); + } + + // Inline scheduler loop; drains ready tasks and exits when main deps done and in-flight is zero + let semaphore = Arc::new(Semaphore::new(this.jobs())); + let mut main_done_rx = main_done_rx.clone(); + loop { + // Drain ready tasks without awaiting + let mut drained_any = false; + loop { + match sched_rx.try_recv() { + Ok((task, deps_for_remove)) => { + drained_any = true; + trace!("scheduler received: {} {}", task.name, task.args.join(" ")); + if this.is_stopping() { + break; + } + let jset = jset.clone(); + let this_ = this.clone(); + let permit = semaphore.clone().acquire_owned().await?; + let config = config.clone(); + let sched_tx = sched_tx.clone(); + in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let in_flight_c = in_flight.clone(); + trace!("running task: {task}"); + jset.lock().await.spawn(async move { + let _permit = permit; + let result = + this_.run_task_sched(&task, &config, sched_tx.clone()).await; + if let Err(err) = &result { + let status = Error::get_exit_status(err); + if !this_.is_stopping() && status.is_none() { + let prefix = task.estyled_prefix(); + if Settings::get().verbose { + this_.eprint( + &task, + &prefix, + &format!("{} {err:?}", style::ered("ERROR")), + ); + } else { + this_.eprint( + &task, + &prefix, + &format!("{} {err}", style::ered("ERROR")), + ); + let mut current_err = err.source(); + while let Some(e) = current_err { + this_.eprint( + &task, + &prefix, + &format!("{} {e}", style::ered("ERROR")), + ); + current_err = e.source(); + } + }; + } + this_.add_failed_task(task.clone(), status); + } + deps_for_remove.lock().await.remove(&task); + trace!("deps removed: {} {}", task.name, task.args.join(" ")); + in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + result + }); + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, + } } - Ok(()) - }); + + // Exit if main deps finished and nothing is running/queued + if *main_done_rx.borrow() + && in_flight.load(std::sync::atomic::Ordering::SeqCst) == 0 + && !drained_any + { + trace!("scheduler drain complete; exiting loop"); + break; + } + + // Await either new work or main_done change + tokio::select! { + m = sched_rx.recv() => { + if let Some((task, deps_for_remove)) = m { + trace!("scheduler received: {} {}", task.name, task.args.join(" ")); + if this.is_stopping() { break; } + let jset = jset.clone(); + let this_ = this.clone(); + let permit = semaphore.clone().acquire_owned().await?; + let config = config.clone(); + let sched_tx = sched_tx.clone(); + in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let in_flight_c = in_flight.clone(); + trace!("running task: {task}"); + jset.lock().await.spawn(async move { + let _permit = permit; + let result = this_.run_task_sched(&task, &config, sched_tx.clone()).await; + if let Err(err) = &result { + let status = Error::get_exit_status(err); + if !this_.is_stopping() && status.is_none() { + let prefix = task.estyled_prefix(); + if Settings::get().verbose { + this_.eprint(&task, &prefix, &format!("{} {err:?}", style::ered("ERROR"))); + } else { + this_.eprint(&task, &prefix, &format!("{} {err}", style::ered("ERROR"))); + let mut current_err = err.source(); + while let Some(e) = current_err { + this_.eprint(&task, &prefix, &format!("{} {e}", style::ered("ERROR"))); + current_err = e.source(); + } + }; + } + this_.add_failed_task(task.clone(), status); + } + deps_for_remove.lock().await.remove(&task); + trace!("deps removed: {} {}", task.name, task.args.join(" ")); + in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + result + }); + } else { + // channel closed; rely on main_done/in_flight to exit soon + } + } + _ = main_done_rx.changed() => { + trace!("main_done changed: {}", *main_done_rx.borrow()); + } + } + } while let Some(result) = jset.lock().await.join_next().await { if result.is_ok() || this.continue_on_error { @@ -387,7 +513,7 @@ impl Run { CmdLineRunner::kill_all(); break; } - handle.await??; + // scheduler loop done if this.output(None) == TaskOutput::KeepOrder { // TODO: display these as tasks complete in order somehow rather than waiting until everything is done @@ -439,7 +565,12 @@ impl Run { } } - async fn run_task(&self, task: &Task, config: &Arc) -> Result<()> { + async fn run_task_sched( + &self, + task: &Task, + config: &Arc, + sched_tx: Arc>)>>, + ) -> Result<()> { let prefix = task.estyled_prefix(); if Settings::get().task_skip.contains(&task.name) { if !self.quiet(Some(task)) { @@ -513,7 +644,7 @@ impl Run { self.parse_usage_spec_and_init_env(config, task, &mut env, get_args) .await?; - self.exec_task_run_entries(config, task, &env, &prefix, rendered_run_scripts) + self.exec_task_run_entries(config, task, &env, &prefix, rendered_run_scripts, sched_tx) .await?; } @@ -539,6 +670,7 @@ impl Run { env: &BTreeMap, prefix: &str, rendered_scripts: Vec<(String, Vec)>, + sched_tx: Arc>)>>, ) -> Result<()> { use crate::task::RunEntry; let mut script_iter = rendered_scripts.into_iter(); @@ -550,10 +682,11 @@ impl Run { } } RunEntry::SingleTask { task: spec } => { - self.exec_single_task_spec(config, spec).await?; + self.inject_and_wait(config, &[spec.to_string()], sched_tx.clone()) + .await?; } RunEntry::TaskGroup { tasks } => { - self.exec_task_group_specs(config, tasks) + self.inject_and_wait(config, tasks, sched_tx.clone()) .await?; } } @@ -561,57 +694,86 @@ impl Run { Ok(()) } - async fn exec_single_task_spec( - &self, - config: &Arc, - spec: &str, - ) -> Result<()> { - let mise_bin = env::MISE_BIN.display().to_string(); - #[cfg(unix)] - let cmd = format!("{} run {}", shell_words::quote(&mise_bin), spec); - #[cfg(windows)] - let cmd = format!("{} run {}", mise_bin, spec); - // Execute as a shell script so output/prefix handling remains consistent - let args: Vec = vec![]; - // Build a fake task prefix using the parent task's, we already have it from call site - // but exec_script requires a real Task; it's fine to reuse current task - // We need env/prefix from outer call so this function is only used inside exec_task_run_entries - // Therefore, just return the command string to caller in future refactors if needed - // For now just spawn with current settings - let dummy_task = Task::default(); - let prefix = dummy_task.estyled_prefix(); - let env_map: BTreeMap = Default::default(); - self.exec_script(&cmd, &args, &dummy_task, &env_map, &prefix).await - } - - async fn exec_task_group_specs( + async fn inject_and_wait( &self, config: &Arc, specs: &[String], + sched_tx: Arc>)>>, ) -> Result<()> { - let mise_bin = env::MISE_BIN.display().to_string(); - #[cfg(unix)] - let cmd = { - let parts = specs - .iter() - .map(|s| format!("({} run {}) &", shell_words::quote(&mise_bin), s)) - .collect::(); - format!("{} wait", parts) - }; - #[cfg(windows)] - let cmd = { - // Fallback to sequential on Windows - specs - .iter() - .map(|s| format!("{} run {}", mise_bin, s)) - .collect::>() - .join(" & ") - }; - let args: Vec = vec![]; - let dummy_task = Task::default(); - let prefix = dummy_task.estyled_prefix(); - let env_map: BTreeMap = Default::default(); - self.exec_script(&cmd, &args, &dummy_task, &env_map, &prefix).await + trace!("inject start: {}", specs.join(", ")); + // Build tasks list from specs + let tasks_map = config.tasks_with_aliases().await?; + let mut to_run: Vec = vec![]; + for spec in specs { + let (name, args) = split_task_spec(spec); + let matches = tasks_map.get_matching(name)?; + ensure!(!matches.is_empty(), "task not found: {}", name); + for t in matches { + let mut t = (*t).clone(); + t.args = args.clone(); + to_run.push(t); + } + } + let sub_deps = Deps::new(config, to_run).await?; + let sub_deps = Arc::new(Mutex::new(sub_deps)); + + // Pump subgraph into scheduler and signal completion via oneshot when done + let (done_tx, done_rx) = oneshot::channel::<()>(); + { + let sub_deps_clone = sub_deps.clone(); + let sched_tx = sched_tx.clone(); + // forward initial leaves synchronously + { + let mut rx = sub_deps_clone.lock().await.subscribe(); + let mut any = false; + loop { + match rx.try_recv() { + Ok(Some(task)) => { + any = true; + trace!("inject initial leaf: {} {}", task.name, task.args.join(" ")); + let _ = sched_tx.send((task, sub_deps_clone.clone())); + } + Ok(None) => { + trace!("inject initial done"); + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => { + break; + } + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => { + break; + } + } + } + if !any { + trace!("inject had no initial leaves"); + } + } + // then forward remaining leaves asynchronously + tokio::spawn(async move { + let mut rx = sub_deps_clone.lock().await.subscribe(); + while let Some(msg) = rx.recv().await { + match msg { + Some(task) => { + trace!( + "inject leaf scheduled: {} {}", + task.name, + task.args.join(" ") + ); + let _ = sched_tx.send((task, sub_deps_clone.clone())); + } + None => { + let _ = done_tx.send(()); + trace!("inject complete"); + break; + } + } + } + }); + } + + done_rx.await.map_err(|e| eyre!(e))?; + Ok(()) } async fn exec_script( From 921bf8e45634b23c7f9a46aa3c49fec998b04b3c Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sat, 6 Sep 2025 14:49:10 -0500 Subject: [PATCH 3/6] refactor(schema): dedupe run/run_windows with .run_field and .run_step --- schema/mise-task.json | 83 ++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/schema/mise-task.json b/schema/mise-task.json index e6241b94a5..fcabcae77b 100644 --- a/schema/mise-task.json +++ b/schema/mise-task.json @@ -4,6 +4,55 @@ "title": "mise-task-schema", "type": "object", "$defs": { + "run_step": { + "oneOf": [ + { + "description": "script to run", + "type": "string" + }, + { + "additionalProperties": false, + "properties": { + "task": { + "description": "single task name (with optional args) to run", + "type": "string" + } + }, + "required": ["task"], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "tasks": { + "description": "parallel task group to run", + "items": { + "description": "task name and args", + "type": "string" + }, + "type": "array" + } + }, + "required": ["tasks"], + "type": "object" + } + ] + }, + "run_field": { + "oneOf": [ + { + "description": "script to run", + "type": "string" + }, + { + "description": "list of steps mixing scripts and task references", + "items": { + "$ref": "#/$defs/run_step" + }, + "type": "array" + } + ] + }, "task": { "oneOf": [ { @@ -217,38 +266,8 @@ "description": "directly connect task to stdin/stdout/stderr", "type": "boolean" }, - "run": { - "oneOf": [ - { - "description": "script to run", - "type": "string" - }, - { - "description": "script to run", - "items": { - "description": "script to run", - "type": "string" - }, - "type": "array" - } - ] - }, - "run_windows": { - "oneOf": [ - { - "description": "script to run on windows", - "type": "string" - }, - { - "description": "script to run on windows", - "items": { - "description": "script to run on windows", - "type": "string" - }, - "type": "array" - } - ] - }, + "run": { "$ref": "#/$defs/run_field" }, + "run_windows": { "$ref": "#/$defs/run_field" }, "file": { "description": "Execute an external script", "type": "string" From 73bb0fe314b3904e3df174fc4b645804dd469fcc Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sat, 6 Sep 2025 15:02:03 -0500 Subject: [PATCH 4/6] refactor(task): minor cleanups in deps/task display for run entries --- src/cli/run.rs | 2 +- src/cli/tasks/info.rs | 6 +++--- src/task/deps.rs | 9 +-------- src/task/mod.rs | 25 ++++++++++++++++++------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/cli/run.rs b/src/cli/run.rs index 0c171ca926..db8b25df48 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -200,7 +200,7 @@ pub struct Run { #[clap(skip)] pub timed_outputs: Arc>>, - // Do not use cache on remote tasks + /// Do not use cache for remote tasks #[clap(long, verbatim_doc_comment, env = "MISE_TASK_REMOTE_NO_CACHE")] pub no_cache: bool, } diff --git a/src/cli/tasks/info.rs b/src/cli/tasks/info.rs index 4fa6b1b53a..2eed3ba6c3 100644 --- a/src/cli/tasks/info.rs +++ b/src/cli/tasks/info.rs @@ -82,9 +82,9 @@ impl TasksInfo { if let Some(file) = &task.file { info::inline_section("File", display_path(file))?; } - let run_scripts = task.run_script_strings(); - if !run_scripts.is_empty() { - info::section("Run", run_scripts.join("\n"))?; + let run = task.run(); + if !run.is_empty() { + info::section("Run", run.iter().map(|e| e.to_string()).join("\n"))?; } if !task.env.is_empty() { let env_display = task diff --git a/src/task/deps.rs b/src/task/deps.rs index 72cca72947..6f7d31d045 100644 --- a/src/task/deps.rs +++ b/src/task/deps.rs @@ -8,7 +8,6 @@ use std::{ sync::Arc, }; use tokio::sync::mpsc; -use tokio::sync::Notify; #[derive(Debug, Clone)] pub struct Deps { @@ -16,7 +15,7 @@ pub struct Deps { sent: HashSet<(String, Vec)>, // tasks+args that have already started so should not run again removed: HashSet<(String, Vec)>, // tasks+args that have already finished to track if we are in an infinitve loop tx: mpsc::UnboundedSender>, - pub completed: Notify, + // not clone, notify waiters via tx None } pub fn task_key(task: &Task) -> (String, Vec) { @@ -71,7 +70,6 @@ impl Deps { tx, sent, removed, - completed: Notify::new(), }) } @@ -126,7 +124,6 @@ impl Deps { let key = (task.name.clone(), task.args.clone()); self.removed.insert(key); self.emit_leaves(); - self.completed.notify_waiters(); } } @@ -207,10 +204,6 @@ impl Deps { self.emit_leaves(); Ok(()) } - - pub fn is_removed_key(&self, key: &(String, Vec)) -> bool { - self.removed.contains(key) - } } fn leaves(graph: &DiGraph) -> Vec { diff --git a/src/task/mod.rs b/src/task/mod.rs index 1634ae5816..98f63816b0 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -15,8 +15,8 @@ use globset::GlobBuilder; use indexmap::IndexMap; use itertools::Itertools; use petgraph::prelude::*; -use serde_derive::{Deserialize, Serialize}; use serde::de; +use serde_derive::{Deserialize, Serialize}; use std::borrow::Cow; use std::cmp::Ordering; use std::collections::BTreeMap; @@ -62,6 +62,16 @@ impl std::str::FromStr for RunEntry { } } +impl Display for RunEntry { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + RunEntry::Script(s) => write!(f, "{}", s), + RunEntry::SingleTask { task } => write!(f, "task: {task}"), + RunEntry::TaskGroup { tasks } => write!(f, "tasks: {}", tasks.join(", ")), + } + } +} + #[derive(Debug, Clone, Deserialize)] #[serde(deny_unknown_fields)] pub struct Task { @@ -716,7 +726,9 @@ impl Default for Task { } } -pub fn deserialize_run_entries<'de, D>(deserializer: D) -> std::result::Result, D::Error> +pub fn deserialize_run_entries<'de, D>( + deserializer: D, +) -> std::result::Result, D::Error> where D: de::Deserializer<'de>, { @@ -738,7 +750,8 @@ where where M: de::MapAccess<'de>, { - let entry: RunEntry = de::Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))?; + let entry: RunEntry = + de::Deserialize::deserialize(de::value::MapAccessDeserializer::new(map))?; Ok(vec![entry]) } @@ -762,10 +775,8 @@ impl Display for Task { let cmd = self .run() .iter() - .find_map(|e| match e { - RunEntry::Script(s) => Some(s.clone()), - _ => None, - }) + .map(|e| e.to_string()) + .next() .or_else(|| self.file.as_ref().map(display_path)); if let Some(cmd) = cmd { From 3013f775827fbba87e76be425fc2e31f2dbed5d3 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Sat, 6 Sep 2025 20:05:11 +0000 Subject: [PATCH 5/6] [autofix.ci] apply automated fixes --- schema/mise-task.json | 34 ++++++++++++++++++++++++++++-- src/cli/run.rs | 2 +- src/task/deps.rs | 48 ------------------------------------------- 3 files changed, 33 insertions(+), 51 deletions(-) diff --git a/schema/mise-task.json b/schema/mise-task.json index fcabcae77b..47ef9b66ed 100644 --- a/schema/mise-task.json +++ b/schema/mise-task.json @@ -266,8 +266,38 @@ "description": "directly connect task to stdin/stdout/stderr", "type": "boolean" }, - "run": { "$ref": "#/$defs/run_field" }, - "run_windows": { "$ref": "#/$defs/run_field" }, + "run": { + "oneOf": [ + { + "description": "script to run", + "type": "string" + }, + { + "description": "script to run", + "items": { + "description": "script to run", + "type": "string" + }, + "type": "array" + } + ] + }, + "run_windows": { + "oneOf": [ + { + "description": "script to run on windows", + "type": "string" + }, + { + "description": "script to run on windows", + "items": { + "description": "script to run on windows", + "type": "string" + }, + "type": "array" + } + ] + }, "file": { "description": "Execute an external script", "type": "string" diff --git a/src/cli/run.rs b/src/cli/run.rs index db8b25df48..12fa2a4094 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -200,7 +200,7 @@ pub struct Run { #[clap(skip)] pub timed_outputs: Arc>>, - /// Do not use cache for remote tasks + // Do not use cache for remote tasks #[clap(long, verbatim_doc_comment, env = "MISE_TASK_REMOTE_NO_CACHE")] pub no_cache: bool, } diff --git a/src/task/deps.rs b/src/task/deps.rs index 6f7d31d045..87fad3ef61 100644 --- a/src/task/deps.rs +++ b/src/task/deps.rs @@ -156,54 +156,6 @@ impl Deps { } } } - - /// Add new tasks (and their dependencies) to the graph and emit any new leaves - pub async fn add(&mut self, config: &Arc, tasks: Vec) -> eyre::Result<()> { - // Build a dependency subgraph for the incoming tasks similar to new() - let mut indexes = HashMap::new(); - let mut stack: Vec = vec![]; - let mut seen: HashSet = HashSet::new(); - - let mut add_idx = |task: &Task, graph: &mut DiGraph| { - *indexes - .entry(task_key(task)) - .or_insert_with(|| graph.add_node(task.clone())) - }; - - for t in &tasks { - // Skip if task already exists in graph - if self.node_idx(t).is_none() { - stack.push(t.clone()); - add_idx(t, &mut self.graph); - } - } - - let all_tasks_to_run = resolve_depends(config, tasks).await?; - while let Some(a) = stack.pop() { - if seen.contains(&a) { - continue; - } - let a_idx = add_idx(&a, &mut self.graph); - let (pre, post) = a.resolve_depends(config, &all_tasks_to_run).await?; - for b in pre { - let b_idx = add_idx(&b, &mut self.graph); - self.graph.update_edge(a_idx, b_idx, ()); - if self.node_idx(&b).is_none() { - stack.push(b.clone()); - } - } - for b in post { - let b_idx = add_idx(&b, &mut self.graph); - self.graph.update_edge(b_idx, a_idx, ()); - if self.node_idx(&b).is_none() { - stack.push(b.clone()); - } - } - seen.insert(a); - } - self.emit_leaves(); - Ok(()) - } } fn leaves(graph: &DiGraph) -> Vec { From f03b0bc5ea6b0a2916bb904597b7948ff87dbf8e Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sat, 6 Sep 2025 15:13:54 -0500 Subject: [PATCH 6/6] docs(tasks): document structured run steps with {task} and {tasks} groups --- docs/tasks/running-tasks.md | 18 +++++------------- docs/tasks/task-configuration.md | 22 ++++++++++++++++++---- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/docs/tasks/running-tasks.md b/docs/tasks/running-tasks.md index 82e418d56d..ed06ed710d 100644 --- a/docs/tasks/running-tasks.md +++ b/docs/tasks/running-tasks.md @@ -138,8 +138,7 @@ depends = ["build"] This will ensure that the `build` task is run before the `test` task. -You can also define a mise task to run other tasks sequentially (or in series). -You can do this by calling `mise run ` in the `run` property of a task. +You can also define a mise task to run other tasks in parallel or in series: ```toml [tasks.example1] @@ -148,19 +147,12 @@ run = "echo 'example1'" [tasks.example2] run = "mise example2" -[tasks.one_by_one] -run = [ - 'mise run example1', - 'mise run example2', -] -``` - -This assumes that `mise` is in your `PATH`. If you are using [mise generate bootstrap](/cli/generate/bootstrap.html) or if `mise` is not on `PATH`, it's better to use [`{{mise_bin}}`](/templates.html#variables) instead of `mise` in the task definition. +[tasks.example3] +run = "echo 'example3'" -```toml [tasks.one_by_one] run = [ - '{{mise_bin}} run example1', - '{{mise_bin}} run example2', + { task = "example1" }, # will wait for example1 to finish before running the next step + { tasks = ["example2", "example3"] }, # these 2 are run in parallel ] ``` diff --git a/docs/tasks/task-configuration.md b/docs/tasks/task-configuration.md index ea261bb335..a671120450 100644 --- a/docs/tasks/task-configuration.md +++ b/docs/tasks/task-configuration.md @@ -9,10 +9,22 @@ All examples are in toml-task format instead of file, however they apply in both ### `run` -- **Type**: `string | string[]` +- **Type**: `string | (string | { task: string } | { tasks: string[] })[]` + +The command(s) to run. This is the only required property for a task. -The command to run. This is the only required property for a task. Note that tasks can be defined in -`mise.toml` in various ways in order to simplify the config, e.g.: these are all equal: +You can now mix scripts with task references: + +```toml +[tasks.grouped] +run = [ + { task = "t1" }, # run t1 (with its dependencies) + { tasks = ["t2", "t3"] }, # run t2 and t3 in parallel (with their dependencies) + "echo end", # then run a script +] +``` + +Simple forms still work and are equivalent: ```toml tasks.a = "echo hello" @@ -26,7 +38,9 @@ run = ["echo hello"] ### `run_windows` -An alternative script to run when `mise run` is executed on windows: +- **Type**: `string | (string | { task: string } | { tasks: string[] })[]` + +Windows-specific variant of `run` supporting the same structured syntax: ```toml [tasks.build]