From 90a62976d7779d6f3a3739a7008386ece849d784 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Sun, 15 Feb 2026 13:46:53 +0000 Subject: [PATCH 1/4] feat(run): stream keep-order output in real-time per task Previously, keep-order mode buffered ALL task output and printed everything after all tasks completed. Now, one task at a time streams its output live while other tasks buffer. When the active task finishes, buffered output from already-finished tasks is flushed, then the next running task is promoted to stream live. Closes #8136 Co-Authored-By: Claude Opus 4.6 --- e2e/tasks/test_task_keep_order | 20 ++++ settings.toml | 2 +- src/cli/run.rs | 3 + src/task/task_executor.rs | 32 +++--- src/task/task_output_handler.rs | 188 +++++++++++++++++++++++++++++-- src/task/task_results_display.rs | 26 +---- 6 files changed, 225 insertions(+), 46 deletions(-) diff --git a/e2e/tasks/test_task_keep_order b/e2e/tasks/test_task_keep_order index 0c573cbb97..8562302fbc 100644 --- a/e2e/tasks/test_task_keep_order +++ b/e2e/tasks/test_task_keep_order @@ -16,3 +16,23 @@ tasks.b = "echo b ; exit 1" tasks.all.depends = ['a', 'b'] EOF assert_fail "mise run -o keep-order all" "[b] b" + +# Verify definition order is preserved even when later tasks finish first +cat <mise.toml +[tasks.slow] +run = "sleep 2 && echo slow" + +[tasks.fast] +run = "echo fast" + +[tasks.all] +depends = ['slow', 'fast'] +EOF +output=$(MISE_JOBS=4 mise run -o keep-order all 2>&1) +slow_pos=$(echo "$output" | grep -n '^\[slow\] slow$' | cut -d: -f1) +fast_pos=$(echo "$output" | grep -n '^\[fast\] fast$' | cut -d: -f1) +if [ "$slow_pos" -ge "$fast_pos" ]; then + echo "keep-order did not preserve definition order: slow at $slow_pos, fast at $fast_pos" + echo "$output" + exit 1 +fi diff --git a/settings.toml b/settings.toml index b5b245a4a7..5ca1b5f4cb 100644 --- a/settings.toml +++ b/settings.toml @@ -1703,7 +1703,7 @@ Change output style when executing tasks. This controls the output of `mise run` enum = [ { value = "prefix", description = "(default if jobs > 1) print by line with the prefix of the task name" }, { value = "interleave", description = "(default if jobs == 1 or all tasks run sequentially) print output as it comes in" }, - { value = "keep-order", description = "print output from tasks in the order they are defined" }, + { value = "keep-order", description = "stream one task at a time, buffering others to avoid interleaving" }, { value = "replacing", description = "replace stdout each time a line is printed-this uses similar logic as `mise install`" }, { value = "timed", description = "only show stdout lines that take longer than 1s to complete" }, { value = "quiet", description = "print only stdout/stderr from tasks and nothing from mise" }, diff --git a/src/cli/run.rs b/src/cli/run.rs index 79822fec27..18b40ff3da 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -475,6 +475,9 @@ impl Run { } this.add_failed_task(task.clone(), status); } + if let Some(oh) = &this.output_handler { + oh.keep_order_state.lock().unwrap().on_task_finished(&task); + } deps_for_remove.lock().await.remove(&task); trace!("deps removed: {} {}", task.name, task.args.join(" ")); in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); diff --git a/src/task/task_executor.rs b/src/task/task_executor.rs index 865ff9319f..fc6a579e7b 100644 --- a/src/task/task_executor.rs +++ b/src/task/task_executor.rs @@ -669,27 +669,27 @@ impl TaskExecutor { } TaskOutput::KeepOrder => { if !task.silent.suppresses_stdout() { - cmd = cmd.with_on_stdout(|line| { - let mut map = self.output_handler.keep_order_output.lock().unwrap(); - if !map.contains_key(task) { - map.insert(task.clone(), Default::default()); - } - if let Some(entry) = map.get_mut(task) { - entry.0.push((prefix.to_string(), line)); - } + let state = self.output_handler.keep_order_state.clone(); + let task_clone = task.clone(); + let prefix_str = prefix.to_string(); + cmd = cmd.with_on_stdout(move |line| { + state + .lock() + .unwrap() + .on_stdout(&task_clone, prefix_str.clone(), line); }); } else { cmd = cmd.stdout(Stdio::null()); } if !task.silent.suppresses_stderr() { - cmd = cmd.with_on_stderr(|line| { - let mut map = self.output_handler.keep_order_output.lock().unwrap(); - if !map.contains_key(task) { - map.insert(task.clone(), Default::default()); - } - if let Some(entry) = map.get_mut(task) { - entry.1.push((prefix.to_string(), line)); - } + let state = self.output_handler.keep_order_state.clone(); + let task_clone = task.clone(); + let prefix_str = prefix.to_string(); + cmd = cmd.with_on_stderr(move |line| { + state + .lock() + .unwrap() + .on_stderr(&task_clone, prefix_str.clone(), line); }); } else { cmd = cmd.stderr(Stdio::null()); diff --git a/src/task/task_output_handler.rs b/src/task/task_output_handler.rs index 9c71c38d41..605fffa10f 100644 --- a/src/task/task_output_handler.rs +++ b/src/task/task_output_handler.rs @@ -1,5 +1,6 @@ use crate::config::Settings; use crate::task::Task; +use crate::task::task_helpers::task_needs_permit; use crate::task::task_output::TaskOutput; use crate::ui::multi_progress_report::MultiProgressReport; use crate::ui::progress_report::SingleReport; @@ -7,7 +8,167 @@ use indexmap::IndexMap; use std::sync::{Arc, Mutex}; use std::time::SystemTime; -type KeepOrderOutputs = (Vec<(String, String)>, Vec<(String, String)>); +/// A single line of output, tagged by stream. +pub enum KeepOrderLine { + Stdout(String, String), // (prefix, line) + Stderr(String, String), // (prefix, line) +} + +/// Streaming state for keep-order mode. +/// +/// One task at a time is "active" and streams output in real-time. +/// Other tasks buffer their output. When the active task finishes, +/// any already-finished tasks' buffers are flushed, then the next +/// running task with buffered output is promoted to stream live. +pub struct KeepOrderState { + /// The task whose output is currently being streamed live + active: Option, + /// Buffered output for non-active tasks (insertion order preserved) + buffers: IndexMap>, + /// Tasks that finished while not active (in order of completion) + finished: Vec, + /// Set after flush_all — further output prints directly + done: bool, +} + +impl KeepOrderState { + pub fn new() -> Self { + Self { + active: None, + buffers: IndexMap::new(), + finished: Vec::new(), + done: false, + } + } + + pub fn init_task(&mut self, task: &Task) { + self.buffers.entry(task.clone()).or_default(); + } + + /// Whether this task should stream live (is active, or is first in + /// definition order when no task is active yet). + fn is_active(&self, task: &Task) -> bool { + if let Some(active) = &self.active { + active == task + } else { + // No active task yet — only the first task in definition order may claim it + self.buffers.first().map(|(t, _)| t) == Some(task) + } + } + + /// Called when a stdout line is produced by a task's process. + pub fn on_stdout(&mut self, task: &Task, prefix: String, line: String) { + if self.done || self.is_active(task) { + self.active = Some(task.clone()); + print_stdout(&prefix, &line); + } else { + self.buffers + .entry(task.clone()) + .or_default() + .push(KeepOrderLine::Stdout(prefix, line)); + } + } + + /// Called when a stderr line is produced by a task's process, + /// or when metadata (command echo, timing) is emitted for a task. + pub fn on_stderr(&mut self, task: &Task, prefix: String, line: String) { + if self.done || self.is_active(task) { + self.active = Some(task.clone()); + print_stderr(&prefix, &line); + } else { + self.buffers + .entry(task.clone()) + .or_default() + .push(KeepOrderLine::Stderr(prefix, line)); + } + } + + /// Called when a task finishes execution. + pub fn on_task_finished(&mut self, task: &Task) { + if !self.buffers.contains_key(task) { + return; // Not a keep-order task + } + if self.is_active(task) { + // Active task finished — clear it, flush waiting tasks, promote next + self.active = None; + self.buffers.shift_remove(task); + self.flush_finished(); + self.promote_next(); + } else { + // Non-active task finished — remember it for later flushing + self.finished.push(task.clone()); + } + } + + /// Flush contiguous finished tasks from the front of the buffer. + /// Stops at the first non-finished task to preserve definition order. + fn flush_finished(&mut self) { + let mut finished: std::collections::HashSet<_> = self.finished.drain(..).collect(); + loop { + let Some((task, _)) = self.buffers.first() else { + break; + }; + if !finished.remove(task) { + break; // Hit a non-finished task, stop + } + let task = task.clone(); + if let Some(lines) = self.buffers.shift_remove(&task) { + Self::print_lines(&lines); + } + } + // Re-add finished tasks we couldn't flush (behind a still-running task) + self.finished.extend(finished); + } + + /// Promote the next buffered (still-running) task to active and + /// flush its current buffer so it can stream live going forward. + fn promote_next(&mut self) { + if let Some((task, _)) = self.buffers.first() { + let task = task.clone(); + self.active = Some(task.clone()); + if let Some(lines) = self.buffers.get_mut(&task) { + let lines = std::mem::take(lines); + Self::print_lines(&lines); + } + } + } + + fn print_lines(lines: &[KeepOrderLine]) { + for line in lines { + match line { + KeepOrderLine::Stdout(prefix, line) => print_stdout(prefix, line), + KeepOrderLine::Stderr(prefix, line) => print_stderr(prefix, line), + } + } + } + + /// Safety-net: flush any remaining output (called at the very end). + /// After this, any further output prints directly. + pub fn flush_all(&mut self) { + self.active = None; + self.flush_finished(); + for (_, lines) in self.buffers.drain(..) { + Self::print_lines(&lines); + } + self.done = true; + } +} + +fn print_stdout(prefix: &str, line: &str) { + if console::colors_enabled() { + prefix_println!(prefix, "{line}\x1b[0m"); + } else { + prefix_println!(prefix, "{line}"); + } +} + +fn print_stderr(prefix: &str, line: &str) { + if console::colors_enabled_stderr() { + prefix_eprintln!(prefix, "{line}\x1b[0m"); + } else { + prefix_eprintln!(prefix, "{line}"); + } +} /// Configuration for OutputHandler pub struct OutputHandlerConfig { @@ -23,7 +184,7 @@ pub struct OutputHandlerConfig { /// Handles task output routing, formatting, and display pub struct OutputHandler { - pub keep_order_output: Arc>>, + pub keep_order_state: Arc>, pub task_prs: IndexMap>>, pub timed_outputs: Arc>>, @@ -41,7 +202,7 @@ pub struct OutputHandler { impl Clone for OutputHandler { fn clone(&self) -> Self { Self { - keep_order_output: self.keep_order_output.clone(), + keep_order_state: self.keep_order_state.clone(), task_prs: self.task_prs.clone(), timed_outputs: self.timed_outputs.clone(), prefix: self.prefix, @@ -59,7 +220,7 @@ impl Clone for OutputHandler { impl OutputHandler { pub fn new(config: OutputHandlerConfig) -> Self { Self { - keep_order_output: Arc::new(Mutex::new(IndexMap::new())), + keep_order_state: Arc::new(Mutex::new(KeepOrderState::new())), task_prs: IndexMap::new(), timed_outputs: Arc::new(Mutex::new(IndexMap::new())), prefix: config.prefix, @@ -77,10 +238,10 @@ impl OutputHandler { pub fn init_task(&mut self, task: &Task) { match self.output(Some(task)) { TaskOutput::KeepOrder => { - self.keep_order_output - .lock() - .unwrap() - .insert(task.clone(), Default::default()); + // Only add tasks that produce output (not orchestrator-only tasks) + if task_needs_permit(task) { + self.keep_order_state.lock().unwrap().init_task(task); + } } TaskOutput::Replacing => { let pr = MultiProgressReport::get().add(&task.estyled_prefix()); @@ -139,9 +300,18 @@ impl OutputHandler { } } - /// Print error message for a task + /// Print error/metadata message for a task. + /// For keep-order mode, routes through the streaming state so messages + /// stay ordered with the task's stdout/stderr. pub fn eprint(&self, task: &Task, prefix: &str, line: &str) { match self.output(Some(task)) { + TaskOutput::KeepOrder => { + self.keep_order_state.lock().unwrap().on_stderr( + task, + prefix.to_string(), + line.to_string(), + ); + } TaskOutput::Replacing => { let pr = self.task_prs.get(task).unwrap().clone(); pr.set_message(format!("{prefix} {line}")); diff --git a/src/task/task_results_display.rs b/src/task/task_results_display.rs index e261483b82..abb0153bad 100644 --- a/src/task/task_results_display.rs +++ b/src/task/task_results_display.rs @@ -35,30 +35,16 @@ impl TaskResultsDisplay { self.exit_if_failed(); } - /// Display keep-order output if using that mode + /// Flush any remaining keep-order output (safety net) fn display_keep_order_output(&self) { if self.output_handler.output(None) != TaskOutput::KeepOrder { return; } - - let output = self.output_handler.keep_order_output.lock().unwrap(); - - for (out, err) in output.values() { - for (prefix, line) in out { - if console::colors_enabled() { - prefix_println!(prefix, "{line}\x1b[0m"); - } else { - prefix_println!(prefix, "{line}"); - } - } - for (prefix, line) in err { - if console::colors_enabled_stderr() { - prefix_eprintln!(prefix, "{line}\x1b[0m"); - } else { - prefix_eprintln!(prefix, "{line}"); - } - } - } + self.output_handler + .keep_order_state + .lock() + .unwrap() + .flush_all(); } /// Display timing summary if enabled From c2fe26c90826c81b19a1e3b0604b1e3949093178 Mon Sep 17 00:00:00 2001 From: jdx <216188+jdx@users.noreply.github.com> Date: Mon, 16 Feb 2026 22:25:47 +0000 Subject: [PATCH 2/4] fix(run): address review feedback for keep-order streaming - Gate on_task_finished behind KeepOrder check to avoid unnecessary work in non-keep-order modes - Update settings.toml description to reflect new streaming behavior Co-Authored-By: Claude Opus 4.6 --- settings.toml | 2 +- src/cli/run.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/settings.toml b/settings.toml index 5ca1b5f4cb..411fb60f8c 100644 --- a/settings.toml +++ b/settings.toml @@ -1703,7 +1703,7 @@ Change output style when executing tasks. This controls the output of `mise run` enum = [ { value = "prefix", description = "(default if jobs > 1) print by line with the prefix of the task name" }, { value = "interleave", description = "(default if jobs == 1 or all tasks run sequentially) print output as it comes in" }, - { value = "keep-order", description = "stream one task at a time, buffering others to avoid interleaving" }, + { value = "keep-order", description = "stream one task's output live while buffering others, printing in definition order as tasks complete" }, { value = "replacing", description = "replace stdout each time a line is printed-this uses similar logic as `mise install`" }, { value = "timed", description = "only show stdout lines that take longer than 1s to complete" }, { value = "quiet", description = "print only stdout/stderr from tasks and nothing from mise" }, diff --git a/src/cli/run.rs b/src/cli/run.rs index 18b40ff3da..f32be3f788 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -476,7 +476,9 @@ impl Run { this.add_failed_task(task.clone(), status); } if let Some(oh) = &this.output_handler { - oh.keep_order_state.lock().unwrap().on_task_finished(&task); + if oh.output(None) == TaskOutput::KeepOrder { + oh.keep_order_state.lock().unwrap().on_task_finished(&task); + } } deps_for_remove.lock().await.remove(&task); trace!("deps removed: {} {}", task.name, task.args.join(" ")); From 8e22c891744b7f3750f324b0923d8a0b21ea9640 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 16 Feb 2026 22:33:37 +0000 Subject: [PATCH 3/4] [autofix.ci] apply automated fixes --- src/cli/run.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cli/run.rs b/src/cli/run.rs index f32be3f788..705da537af 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -475,11 +475,10 @@ impl Run { } this.add_failed_task(task.clone(), status); } - if let Some(oh) = &this.output_handler { - if oh.output(None) == TaskOutput::KeepOrder { + if let Some(oh) = &this.output_handler + && oh.output(None) == TaskOutput::KeepOrder { oh.keep_order_state.lock().unwrap().on_task_finished(&task); } - } deps_for_remove.lock().await.remove(&task); trace!("deps removed: {} {}", task.name, task.args.join(" ")); in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); From e616f3c578e0acb20cf63825a53af92e86b0b4c7 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Mon, 16 Feb 2026 22:39:07 +0000 Subject: [PATCH 4/4] [autofix.ci] apply automated fixes (attempt 2/3) --- src/cli/run.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/cli/run.rs b/src/cli/run.rs index 705da537af..bee92de69d 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -476,9 +476,10 @@ impl Run { this.add_failed_task(task.clone(), status); } if let Some(oh) = &this.output_handler - && oh.output(None) == TaskOutput::KeepOrder { - oh.keep_order_state.lock().unwrap().on_task_finished(&task); - } + && oh.output(None) == TaskOutput::KeepOrder + { + oh.keep_order_state.lock().unwrap().on_task_finished(&task); + } deps_for_remove.lock().await.remove(&task); trace!("deps removed: {} {}", task.name, task.args.join(" ")); in_flight_c.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);