-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(task): stream keep-order output in real-time per task #8164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90a6297
c2fe26c
8e22c89
e616f3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,13 +1,174 @@ | ||||||||||||||||||||||||||||||||||||
| use crate::config::Settings; | ||||||||||||||||||||||||||||||||||||
| use crate::task::Task; | ||||||||||||||||||||||||||||||||||||
| use crate::task::task_helpers::task_needs_permit; | ||||||||||||||||||||||||||||||||||||
| use crate::task::task_output::TaskOutput; | ||||||||||||||||||||||||||||||||||||
| use crate::ui::multi_progress_report::MultiProgressReport; | ||||||||||||||||||||||||||||||||||||
| use crate::ui::progress_report::SingleReport; | ||||||||||||||||||||||||||||||||||||
| use indexmap::IndexMap; | ||||||||||||||||||||||||||||||||||||
| use std::sync::{Arc, Mutex}; | ||||||||||||||||||||||||||||||||||||
| use std::time::SystemTime; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| type KeepOrderOutputs = (Vec<(String, String)>, Vec<(String, String)>); | ||||||||||||||||||||||||||||||||||||
| /// A single line of output, tagged by stream. | ||||||||||||||||||||||||||||||||||||
| pub enum KeepOrderLine { | ||||||||||||||||||||||||||||||||||||
| Stdout(String, String), // (prefix, line) | ||||||||||||||||||||||||||||||||||||
| Stderr(String, String), // (prefix, line) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Streaming state for keep-order mode. | ||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||
| /// One task at a time is "active" and streams output in real-time. | ||||||||||||||||||||||||||||||||||||
| /// Other tasks buffer their output. When the active task finishes, | ||||||||||||||||||||||||||||||||||||
| /// any already-finished tasks' buffers are flushed, then the next | ||||||||||||||||||||||||||||||||||||
| /// running task with buffered output is promoted to stream live. | ||||||||||||||||||||||||||||||||||||
| pub struct KeepOrderState { | ||||||||||||||||||||||||||||||||||||
| /// The task whose output is currently being streamed live | ||||||||||||||||||||||||||||||||||||
| active: Option<Task>, | ||||||||||||||||||||||||||||||||||||
| /// Buffered output for non-active tasks (insertion order preserved) | ||||||||||||||||||||||||||||||||||||
| buffers: IndexMap<Task, Vec<KeepOrderLine>>, | ||||||||||||||||||||||||||||||||||||
| /// Tasks that finished while not active (in order of completion) | ||||||||||||||||||||||||||||||||||||
| finished: Vec<Task>, | ||||||||||||||||||||||||||||||||||||
| /// Set after flush_all — further output prints directly | ||||||||||||||||||||||||||||||||||||
| done: bool, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| impl KeepOrderState { | ||||||||||||||||||||||||||||||||||||
| pub fn new() -> Self { | ||||||||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||||||||
| active: None, | ||||||||||||||||||||||||||||||||||||
| buffers: IndexMap::new(), | ||||||||||||||||||||||||||||||||||||
| finished: Vec::new(), | ||||||||||||||||||||||||||||||||||||
| done: false, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| pub fn init_task(&mut self, task: &Task) { | ||||||||||||||||||||||||||||||||||||
| self.buffers.entry(task.clone()).or_default(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Whether this task should stream live (is active, or is first in | ||||||||||||||||||||||||||||||||||||
| /// definition order when no task is active yet). | ||||||||||||||||||||||||||||||||||||
| fn is_active(&self, task: &Task) -> bool { | ||||||||||||||||||||||||||||||||||||
| if let Some(active) = &self.active { | ||||||||||||||||||||||||||||||||||||
| active == task | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| // No active task yet — only the first task in definition order may claim it | ||||||||||||||||||||||||||||||||||||
| self.buffers.first().map(|(t, _)| t) == Some(task) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
cursor[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Called when a stdout line is produced by a task's process. | ||||||||||||||||||||||||||||||||||||
| pub fn on_stdout(&mut self, task: &Task, prefix: String, line: String) { | ||||||||||||||||||||||||||||||||||||
| if self.done || self.is_active(task) { | ||||||||||||||||||||||||||||||||||||
| self.active = Some(task.clone()); | ||||||||||||||||||||||||||||||||||||
| print_stdout(&prefix, &line); | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| self.buffers | ||||||||||||||||||||||||||||||||||||
| .entry(task.clone()) | ||||||||||||||||||||||||||||||||||||
| .or_default() | ||||||||||||||||||||||||||||||||||||
| .push(KeepOrderLine::Stdout(prefix, line)); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+60
to
+69
|
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Called when a stderr line is produced by a task's process, | ||||||||||||||||||||||||||||||||||||
| /// or when metadata (command echo, timing) is emitted for a task. | ||||||||||||||||||||||||||||||||||||
| pub fn on_stderr(&mut self, task: &Task, prefix: String, line: String) { | ||||||||||||||||||||||||||||||||||||
| if self.done || self.is_active(task) { | ||||||||||||||||||||||||||||||||||||
| self.active = Some(task.clone()); | ||||||||||||||||||||||||||||||||||||
| print_stderr(&prefix, &line); | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| self.buffers | ||||||||||||||||||||||||||||||||||||
| .entry(task.clone()) | ||||||||||||||||||||||||||||||||||||
| .or_default() | ||||||||||||||||||||||||||||||||||||
| .push(KeepOrderLine::Stderr(prefix, line)); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Called when a task finishes execution. | ||||||||||||||||||||||||||||||||||||
| pub fn on_task_finished(&mut self, task: &Task) { | ||||||||||||||||||||||||||||||||||||
| if !self.buffers.contains_key(task) { | ||||||||||||||||||||||||||||||||||||
| return; // Not a keep-order task | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| if self.is_active(task) { | ||||||||||||||||||||||||||||||||||||
| // Active task finished — clear it, flush waiting tasks, promote next | ||||||||||||||||||||||||||||||||||||
| self.active = None; | ||||||||||||||||||||||||||||||||||||
| self.buffers.shift_remove(task); | ||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Active task's buffered output discarded on finishLow Severity In Additional Locations (1) |
||||||||||||||||||||||||||||||||||||
| self.flush_finished(); | ||||||||||||||||||||||||||||||||||||
| self.promote_next(); | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| // Non-active task finished — remember it for later flushing | ||||||||||||||||||||||||||||||||||||
| self.finished.push(task.clone()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
cursor[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Flush contiguous finished tasks from the front of the buffer. | ||||||||||||||||||||||||||||||||||||
| /// Stops at the first non-finished task to preserve definition order. | ||||||||||||||||||||||||||||||||||||
| fn flush_finished(&mut self) { | ||||||||||||||||||||||||||||||||||||
| let mut finished: std::collections::HashSet<_> = self.finished.drain(..).collect(); | ||||||||||||||||||||||||||||||||||||
| loop { | ||||||||||||||||||||||||||||||||||||
| let Some((task, _)) = self.buffers.first() else { | ||||||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
| if !finished.remove(task) { | ||||||||||||||||||||||||||||||||||||
| break; // Hit a non-finished task, stop | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| let task = task.clone(); | ||||||||||||||||||||||||||||||||||||
| if let Some(lines) = self.buffers.shift_remove(&task) { | ||||||||||||||||||||||||||||||||||||
| Self::print_lines(&lines); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+105
to
+118
|
||||||||||||||||||||||||||||||||||||
| // Re-add finished tasks we couldn't flush (behind a still-running task) | ||||||||||||||||||||||||||||||||||||
| self.finished.extend(finished); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
cursor[bot] marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| /// Promote the next buffered (still-running) task to active and | ||||||||||||||||||||||||||||||||||||
| /// flush its current buffer so it can stream live going forward. | ||||||||||||||||||||||||||||||||||||
| fn promote_next(&mut self) { | ||||||||||||||||||||||||||||||||||||
| if let Some((task, _)) = self.buffers.first() { | ||||||||||||||||||||||||||||||||||||
| let task = task.clone(); | ||||||||||||||||||||||||||||||||||||
| self.active = Some(task.clone()); | ||||||||||||||||||||||||||||||||||||
| if let Some(lines) = self.buffers.get_mut(&task) { | ||||||||||||||||||||||||||||||||||||
| let lines = std::mem::take(lines); | ||||||||||||||||||||||||||||||||||||
| Self::print_lines(&lines); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
Comment on lines
+126
to
+132
|
||||||||||||||||||||||||||||||||||||
| if let Some((task, _)) = self.buffers.first() { | |
| let task = task.clone(); | |
| self.active = Some(task.clone()); | |
| if let Some(lines) = self.buffers.get_mut(&task) { | |
| let lines = std::mem::take(lines); | |
| Self::print_lines(&lines); | |
| } | |
| // Find the first task that actually has buffered output. | |
| if let Some((task, lines)) = self | |
| .buffers | |
| .iter_mut() | |
| .find(|(_, lines)| !lines.is_empty()) | |
| { | |
| let task = task.clone(); | |
| self.active = Some(task.clone()); | |
| let lines = std::mem::take(lines); | |
| Self::print_lines(&lines); |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the
with_on_stdoutclosure,prefix_str.clone()is called for every line of output. For tasks that produce a lot of output, this can be inefficient. A similar issue exists forwith_on_stderr.You could optimize this by using an
Arc<String>for the prefix. This would involve a few changes across files:In
src/task/task_executor.rs, create anArc<String>for the prefix outside the closure and clone theArcinside:In
src/task/task_output_handler.rs, updateKeepOrderLineto store anArc<String>:Update
on_stdoutandon_stderrinKeepOrderStateto acceptArc<String>:This would replace expensive string cloning with cheap
Arccloning.