diff --git a/docs/dev-tools/prepare.md b/docs/dev-tools/prepare.md index c8190233c2..7872b4680b 100644 --- a/docs/dev-tools/prepare.md +++ b/docs/dev-tools/prepare.md @@ -97,6 +97,7 @@ run = "npx prisma generate" | `dir` | string | Working directory for the command | | `description` | string | Description shown in output | | `touch_outputs` | bool | Touch output mtimes after a successful run so they appear fresh (default: true) | +| `depends` | string[] | Other provider names that must complete before this one runs | ## Freshness Checking @@ -170,10 +171,35 @@ mise prepare --only npm --only codegen mise prepare --skip npm ``` +## Dependencies + +Providers can declare dependencies on other providers using the `depends` field. A provider +will wait for all its dependencies to complete successfully before running. + +```toml +[prepare.uv] +auto = true + +[prepare.ansible-galaxy] +auto = true +depends = ["uv"] +run = "ansible-galaxy install -f requirements.yml" +sources = ["requirements.yml"] +outputs = [".galaxy-installed"] +``` + +In this example, `ansible-galaxy` will wait for `uv` to finish before starting. + +Providers without `depends` run in parallel as before. If a dependency fails, all providers +that depend on it are skipped. Circular dependencies are detected and the affected providers +are skipped with a warning. + ## Parallel Execution Prepare providers run in parallel, respecting the `jobs` setting for concurrency limits. This speeds up preparation when multiple providers need to run (e.g., both npm and pip). +Providers with `depends` will wait for their dependencies to complete before starting, +while independent providers run concurrently. ```toml [settings] @@ -193,14 +219,17 @@ auto = true [prepare.prisma] auto = true +depends = ["npm"] # needs node_modules first sources = ["prisma/schema.prisma"] outputs = ["node_modules/.prisma/"] run = "npx prisma generate" [prepare.frontend-codegen] +depends = ["npm"] # needs node_modules first sources = ["schema.graphql", "codegen.ts"] outputs = ["src/generated/"] run = "npm run codegen" ``` -Running `mise prep` will check all four providers and run any that are stale, in parallel. +Running `mise prep` will install npm and poetry dependencies in parallel, then run prisma +and frontend-codegen (also in parallel, since they only depend on npm, not each other). diff --git a/e2e/cli/test_prepare_depends b/e2e/cli/test_prepare_depends new file mode 100644 index 0000000000..450f08c0fe --- /dev/null +++ b/e2e/cli/test_prepare_depends @@ -0,0 +1,84 @@ +#!/usr/bin/env bash + +# Test prepare dependency ordering +# Verifies that prepare providers with `depends` run in the correct order + +# Create source files so providers are stale +touch schema.graphql +touch requirements.txt + +cat >mise.toml <<'EOF' +[prepare.step-a] +run = "bash -c 'echo STEP-A >> prepare.log'" +sources = ["schema.graphql"] +outputs = ["gen-a"] +description = "step-a" + +[prepare.step-b] +run = "bash -c 'echo STEP-B >> prepare.log'" +sources = ["requirements.txt"] +outputs = ["gen-b"] +depends = ["step-a"] +description = "step-b" + +[prepare.step-c] +run = "bash -c 'echo STEP-C >> prepare.log'" +sources = ["requirements.txt"] +outputs = ["gen-c"] +depends = ["step-b"] +description = "step-c" +EOF + +# Force run to ensure all steps execute +mise prepare --force 2>&1 + +# Verify all steps ran and in dependency order (a before b before c) +assert "cat prepare.log" "STEP-A +STEP-B +STEP-C" + +# Clean up for next test +rm -f prepare.log + +# Test that independent steps can still run in parallel alongside deps +cat >mise.toml <<'EOF' +[prepare.dep-first] +run = "bash -c 'echo DEP-FIRST >> prepare2.log'" +sources = ["schema.graphql"] +outputs = ["gen-dep-first"] +description = "dep-first" + +[prepare.dep-second] +run = "bash -c 'echo DEP-SECOND >> prepare2.log'" +sources = ["requirements.txt"] +outputs = ["gen-dep-second"] +depends = ["dep-first"] +description = "dep-second" + +[prepare.independent] +run = "bash -c 'echo INDEPENDENT >> prepare2.log'" +sources = ["schema.graphql"] +outputs = ["gen-independent"] +description = "independent" +EOF + +mise prepare --force 2>&1 + +# dep-first must appear before dep-second +output=$(cat prepare2.log) +assert_contains "echo '$output'" "DEP-FIRST" +assert_contains "echo '$output'" "DEP-SECOND" +assert_contains "echo '$output'" "INDEPENDENT" + +# Verify dep-first is before dep-second in the log +dep_first_line=$(grep -n "DEP-FIRST" prepare2.log | head -1 | cut -d: -f1) +dep_second_line=$(grep -n "DEP-SECOND" prepare2.log | head -1 | cut -d: -f1) +if [[ $dep_first_line -ge $dep_second_line ]]; then + echo "ERROR: DEP-FIRST (line $dep_first_line) should appear before DEP-SECOND (line $dep_second_line)" + exit 1 +fi + +rm -f prepare2.log + +# Test dry-run with depends shows what would run +assert_contains "mise prepare --dry-run" "dep-first" diff --git a/schema/mise.json b/schema/mise.json index 21e35d2a6f..9f65dbbb5f 100644 --- a/schema/mise.json +++ b/schema/mise.json @@ -2221,6 +2221,13 @@ "description": { "type": "string", "description": "Description shown in output" + }, + "depends": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Other prepare providers that must complete before this one runs" } }, "additionalProperties": false, diff --git a/src/cli/prepare.rs b/src/cli/prepare.rs index a1a72cffa7..e12f730fee 100644 --- a/src/cli/prepare.rs +++ b/src/cli/prepare.rs @@ -85,6 +85,9 @@ impl Prepare { PrepareStepResult::Skipped(id) => { debug!("Skipped: {}", id); } + PrepareStepResult::Failed(id) => { + miseprintln!("Failed: {}", id); + } } } diff --git a/src/deps_graph.rs b/src/deps_graph.rs new file mode 100644 index 0000000000..8121002ea5 --- /dev/null +++ b/src/deps_graph.rs @@ -0,0 +1,413 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::hash::Hash; + +use eyre::{Result, bail}; +use indexmap::IndexSet; +use petgraph::Direction; +use petgraph::algo::is_cyclic_directed; +use petgraph::stable_graph::{NodeIndex, StableGraph}; +use tokio::sync::mpsc; + +/// Generic dependency graph scheduler using Kahn's algorithm. +/// +/// Emits nodes that are ready to process (all dependencies satisfied) +/// via an mpsc channel. Supports success/failure completion with +/// transitive dependency blocking and cycle detection. +/// +/// Type parameters: +/// - `K`: Key type for identifying nodes +/// - `N`: Node value type stored in the graph +#[derive(Debug)] +pub struct DepsGraph +where + K: Hash + Eq + Clone + fmt::Display, + N: Clone + fmt::Debug, +{ + graph: StableGraph, + node_indices: HashMap, + sent: HashSet, + blocked: HashSet, + tx: mpsc::UnboundedSender>, + key_fn: fn(&N) -> K, +} + +impl DepsGraph +where + K: Hash + Eq + Clone + fmt::Display, + N: Clone + fmt::Debug, +{ + /// Create a new DepsGraph. + /// + /// - `nodes`: Iterator of (key, node) pairs to add to the graph + /// - `edges`: Iterator of (from_key, to_key) pairs meaning "from depends on to" + /// - `key_fn`: Function to extract a key from a node value + pub fn new( + nodes: impl IntoIterator, + edges: impl IntoIterator, + key_fn: fn(&N) -> K, + ) -> Result { + let mut graph = StableGraph::new(); + let mut node_indices = HashMap::new(); + + for (key, node) in nodes { + if node_indices.contains_key(&key) { + continue; + } + let idx = graph.add_node(node); + node_indices.insert(key, idx); + } + + for (from_key, to_key) in edges { + let Some(&from_idx) = node_indices.get(&from_key) else { + continue; + }; + let Some(&to_idx) = node_indices.get(&to_key) else { + bail!("'{}' depends on unknown '{}'", from_key, to_key); + }; + if from_key != to_key { + graph.update_edge(from_idx, to_idx, ()); + } + } + + let (tx, _) = mpsc::unbounded_channel(); + + let mut deps = Self { + graph, + node_indices, + sent: HashSet::new(), + blocked: HashSet::new(), + tx, + key_fn, + }; + + deps.detect_and_block_cycles(); + + Ok(deps) + } + + /// Subscribe to receive nodes that are ready to process. + /// Returns a receiver that emits `Some(node)` for each ready node, + /// followed by `None` when all nodes have been processed. + pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver> { + let (tx, rx) = mpsc::unbounded_channel(); + self.tx = tx; + self.emit_leaves(); + rx + } + + /// Mark a node as successfully completed and emit any newly-ready nodes. + pub fn complete_success(&mut self, key: &K) { + self.remove_node(key); + self.emit_leaves(); + } + + /// Mark a node as failed and block all transitive dependents. + pub fn complete_failure(&mut self, key: &K) { + if let Some(&idx) = self.node_indices.get(key) { + let dependents = self.get_transitive_dependents(idx); + for dep_idx in dependents { + if let Some(dep_node) = self.graph.node_weight(dep_idx) { + let dep_key = (self.key_fn)(dep_node); + self.blocked.insert(dep_key); + } + } + } + + self.remove_node(key); + self.emit_leaves(); + } + + /// Returns whether all nodes have been processed. + pub fn is_empty(&self) -> bool { + self.graph.node_count() == 0 + } + + /// Returns the keys of all blocked nodes (dependency failures or cycles). + pub fn blocked_keys(&self) -> Vec { + self.graph + .node_indices() + .filter_map(|idx| { + let node = self.graph.node_weight(idx)?; + let key = (self.key_fn)(node); + if self.blocked.contains(&key) { + Some(key) + } else { + None + } + }) + .collect() + } + + /// Returns the node values that are blocked. + pub fn blocked_nodes(&self) -> Vec { + self.graph + .node_indices() + .filter_map(|idx| { + let node = self.graph.node_weight(idx)?; + let key = (self.key_fn)(node); + if self.blocked.contains(&key) { + Some(node.clone()) + } else { + None + } + }) + .collect() + } + + /// Detect cycles and mark all nodes in cycles as blocked. + fn detect_and_block_cycles(&mut self) { + if !is_cyclic_directed(&self.graph) { + return; + } + + let mut can_reach_leaf: HashSet = HashSet::new(); + + // Start with all leaf nodes (no outgoing edges = no dependencies) + for idx in self.graph.node_indices() { + if self + .graph + .neighbors_directed(idx, Direction::Outgoing) + .next() + .is_none() + { + can_reach_leaf.insert(idx); + } + } + + // Propagate backwards: if all dependencies of a node can reach a leaf, + // then it can also reach a leaf + let mut changed = true; + while changed { + changed = false; + for idx in self.graph.node_indices() { + if can_reach_leaf.contains(&idx) { + continue; + } + let deps_can_reach = self + .graph + .neighbors_directed(idx, Direction::Outgoing) + .all(|dep_idx| can_reach_leaf.contains(&dep_idx)); + if deps_can_reach + && self + .graph + .neighbors_directed(idx, Direction::Outgoing) + .next() + .is_some() + { + can_reach_leaf.insert(idx); + changed = true; + } + } + } + + // Any node that cannot reach a leaf is in a cycle + for idx in self.graph.node_indices() { + if !can_reach_leaf.contains(&idx) + && let Some(node) = self.graph.node_weight(idx) + { + let key = (self.key_fn)(node); + self.blocked.insert(key); + } + } + } + + /// Emit all nodes that have no remaining dependencies (leaf nodes). + fn emit_leaves(&mut self) { + let leaves = self.find_leaves(); + + for (key, node) in leaves { + if self.sent.contains(&key) || self.blocked.contains(&key) { + continue; + } + + if self.sent.insert(key.clone()) { + trace!("Scheduling: {}", key); + if let Err(e) = self.tx.send(Some(node)) { + trace!("Error sending node: {e:?}"); + } + } + } + + if self.is_all_done() { + trace!("All nodes finished"); + if let Err(e) = self.tx.send(None) { + trace!("Error closing stream: {e:?}"); + } + } + } + + /// Find all leaf nodes (no unsatisfied dependencies). + fn find_leaves(&self) -> Vec<(K, N)> { + self.graph + .externals(Direction::Outgoing) + .filter_map(|idx| { + let node = self.graph.node_weight(idx)?; + Some(((self.key_fn)(node), node.clone())) + }) + .collect() + } + + /// Check if all nodes have been processed (sent, completed, or blocked). + fn is_all_done(&self) -> bool { + if self.is_empty() { + return true; + } + + self.graph.node_indices().all(|idx| { + self.graph + .node_weight(idx) + .map(|node| self.blocked.contains(&(self.key_fn)(node))) + .unwrap_or(true) + }) + } + + /// Remove a node from the graph by its key. + fn remove_node(&mut self, key: &K) { + if let Some(&idx) = self.node_indices.get(key) { + self.graph.remove_node(idx); + self.node_indices.remove(key); + } + } + + /// Get all transitive dependents of a node (nodes that depend on this one). + fn get_transitive_dependents(&self, start_idx: NodeIndex) -> IndexSet { + let mut dependents = IndexSet::new(); + let mut stack = vec![start_idx]; + + while let Some(idx) = stack.pop() { + for neighbor in self.graph.neighbors_directed(idx, Direction::Incoming) { + if dependents.insert(neighbor) { + stack.push(neighbor); + } + } + } + + dependents + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[allow(clippy::ptr_arg)] + fn string_key(s: &String) -> String { + s.clone() + } + + #[test] + fn test_empty_graph() { + let deps: DepsGraph = + DepsGraph::new(vec![], Vec::<(String, String)>::new(), string_key).unwrap(); + assert!(deps.is_empty()); + } + + #[test] + fn test_no_deps_all_ready() { + let nodes = vec![ + ("a".into(), "a".into()), + ("b".into(), "b".into()), + ("c".into(), "c".into()), + ]; + let mut deps: DepsGraph = + DepsGraph::new(nodes, Vec::<(String, String)>::new(), string_key).unwrap(); + let mut rx = deps.subscribe(); + + let mut emitted = vec![]; + while let Ok(Some(id)) = rx.try_recv() { + emitted.push(id); + } + assert_eq!(emitted.len(), 3); + } + + #[test] + fn test_linear_ordering() { + let nodes: Vec<(String, String)> = vec![ + ("a".into(), "a".into()), + ("b".into(), "b".into()), + ("c".into(), "c".into()), + ]; + let edges: Vec<(String, String)> = vec![("b".into(), "a".into()), ("c".into(), "b".into())]; + let mut deps = DepsGraph::new(nodes, edges, string_key).unwrap(); + let mut rx = deps.subscribe(); + + let first = rx.try_recv().unwrap().unwrap(); + assert_eq!(first, "a"); + assert!(rx.try_recv().is_err()); + + deps.complete_success(&"a".into()); + let second = rx.try_recv().unwrap().unwrap(); + assert_eq!(second, "b"); + + deps.complete_success(&"b".into()); + let third = rx.try_recv().unwrap().unwrap(); + assert_eq!(third, "c"); + + deps.complete_success(&"c".into()); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_failure_blocks_dependents() { + let nodes: Vec<(String, String)> = vec![ + ("a".into(), "a".into()), + ("b".into(), "b".into()), + ("c".into(), "c".into()), + ("d".into(), "d".into()), + ]; + let edges: Vec<(String, String)> = vec![("b".into(), "a".into()), ("c".into(), "b".into())]; + let mut deps = DepsGraph::new(nodes, edges, string_key).unwrap(); + let mut rx = deps.subscribe(); + + let mut initial = vec![]; + while let Ok(Some(id)) = rx.try_recv() { + initial.push(id); + } + assert_eq!(initial.len(), 2); + assert!(initial.contains(&"a".to_string())); + assert!(initial.contains(&"d".to_string())); + + deps.complete_failure(&"a".into()); + let blocked = deps.blocked_keys(); + assert!(blocked.contains(&"b".to_string())); + assert!(blocked.contains(&"c".to_string())); + + deps.complete_success(&"d".into()); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_cycle_detection() { + let nodes: Vec<(String, String)> = vec![ + ("a".into(), "a".into()), + ("b".into(), "b".into()), + ("c".into(), "c".into()), + ]; + let edges: Vec<(String, String)> = vec![("a".into(), "b".into()), ("b".into(), "a".into())]; + let mut deps = DepsGraph::new(nodes, edges, string_key).unwrap(); + + let blocked = deps.blocked_keys(); + assert!(blocked.contains(&"a".to_string())); + assert!(blocked.contains(&"b".to_string())); + + let mut rx = deps.subscribe(); + let first = rx.try_recv().unwrap().unwrap(); + assert_eq!(first, "c"); + + deps.complete_success(&"c".into()); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_unknown_dep_error() { + let nodes: Vec<(String, String)> = vec![("a".into(), "a".into())]; + let edges: Vec<(String, String)> = vec![("a".into(), "nonexistent".into())]; + let result = DepsGraph::new(nodes, edges, string_key); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("unknown")); + } +} diff --git a/src/main.rs b/src/main.rs index e44311d042..811c7a95b3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,6 +36,7 @@ pub(crate) mod build_time; mod cache; mod cli; mod config; +pub(crate) mod deps_graph; mod direnv; mod dirs; pub(crate) mod duration; diff --git a/src/prepare/engine.rs b/src/prepare/engine.rs index 5c3a4ea29d..a3e90d5f69 100644 --- a/src/prepare/engine.rs +++ b/src/prepare/engine.rs @@ -1,18 +1,23 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::SystemTime; use eyre::Result; use filetime::FileTime; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; use crate::cmd::CmdLineRunner; use crate::config::config_file::ConfigFile; use crate::config::{Config, Settings}; -use crate::parallel; use crate::ui::multi_progress_report::MultiProgressReport; +type StepOutput = (PrepareStepResult, Vec); +type JobOutput = Result<(String, PrepareStepResult, Vec), (String, eyre::Report)>; + use super::PrepareProvider; +use super::prepare_deps::PrepareDeps; use super::providers::{ BunPrepareProvider, BundlerPrepareProvider, ComposerPrepareProvider, CustomPrepareProvider, GitSubmodulePrepareProvider, GoPrepareProvider, NpmPrepareProvider, PipPrepareProvider, @@ -48,6 +53,8 @@ pub enum PrepareStepResult { Fresh(String), /// Step was skipped by user request Skipped(String), + /// Step failed + Failed(String), } /// Result of running all prepare steps @@ -62,6 +69,7 @@ struct PrepareJob { cmd: super::PrepareCommand, outputs: Vec, touch: bool, + depends: Vec, } impl PrepareResult { @@ -270,12 +278,14 @@ impl PrepareEngine { .collect() } - /// Run all stale prepare steps in parallel + /// Run all stale prepare steps, respecting dependency ordering pub async fn run(&self, opts: PrepareOptions) -> Result { let mut results = vec![]; // Collect providers that need to run let mut to_run: Vec = vec![]; + // Track IDs of providers that are fresh/skipped (treated as already satisfied for deps) + let mut satisfied_ids: HashSet = HashSet::new(); for provider in &self.providers { let id = provider.id().to_string(); @@ -283,13 +293,15 @@ impl PrepareEngine { // Check auto_only filter if opts.auto_only && !provider.is_auto() { trace!("prepare step {} is not auto, skipping", id); - results.push(PrepareStepResult::Skipped(id)); + results.push(PrepareStepResult::Skipped(id.clone())); + satisfied_ids.insert(id); continue; } // Check skip list if opts.skip.contains(&id) { - results.push(PrepareStepResult::Skipped(id)); + results.push(PrepareStepResult::Skipped(id.clone())); + satisfied_ids.insert(id); continue; } @@ -297,7 +309,8 @@ impl PrepareEngine { if let Some(ref only) = opts.only && !only.contains(&id) { - results.push(PrepareStepResult::Skipped(id)); + results.push(PrepareStepResult::Skipped(id.clone())); + satisfied_ids.insert(id); continue; } @@ -311,6 +324,7 @@ impl PrepareEngine { let cmd = provider.prepare_command()?; let outputs = provider.outputs(); let touch = provider.touch_outputs(); + let depends = provider.depends(); if opts.dry_run { // Just record that it would run, let CLI handle output @@ -321,59 +335,280 @@ impl PrepareEngine { cmd, outputs, touch, + depends, }); } } else { trace!("prepare step {} is fresh, skipping", id); - results.push(PrepareStepResult::Fresh(id)); + results.push(PrepareStepResult::Fresh(id.clone())); + satisfied_ids.insert(id); } } - // Run stale providers in parallel + // Run stale providers with dependency ordering if !to_run.is_empty() { - let mpr = MultiProgressReport::get(); - let toolset_env = opts.env.clone(); + let has_deps = to_run.iter().any(|j| !j.depends.is_empty()); + + if has_deps { + let run_results = self + .run_with_deps(to_run, &satisfied_ids, &opts.env) + .await?; + for (step_result, outputs) in run_results { + for output in &outputs { + super::clear_output_stale(output); + } + results.push(step_result); + } + } else { + // No dependencies — use simple parallel execution + let run_results = self.run_parallel(to_run, &opts.env).await?; + for (step_result, outputs) in run_results { + for output in &outputs { + super::clear_output_stale(output); + } + results.push(step_result); + } + } + } + + Ok(PrepareResult { steps: results }) + } + + /// Simple parallel execution (no dependency ordering) + async fn run_parallel( + &self, + to_run: Vec, + toolset_env: &BTreeMap, + ) -> Result)>> { + let mpr = MultiProgressReport::get(); + + let to_run_with_context: Vec<_> = to_run + .into_iter() + .map(|job| (job, mpr.clone(), toolset_env.clone())) + .collect(); + + crate::parallel::parallel(to_run_with_context, |(job, mpr, toolset_env)| async move { + let pr = mpr.add(&job.cmd.description); + match Self::execute_prepare_static(&job.cmd, &toolset_env) { + Ok(()) => { + if job.touch { + Self::touch_outputs(&job.outputs); + } + pr.finish_with_message(format!("{} done", job.cmd.description)); + Ok((PrepareStepResult::Ran(job.id), job.outputs)) + } + Err(e) => { + pr.finish_with_message(format!("{} failed: {}", job.cmd.description, e)); + Err(e) + } + } + }) + .await + } - // Include mpr/env in the tuple so closure doesn't capture anything - let to_run_with_context: Vec<_> = to_run - .into_iter() - .map(|job| (job, mpr.clone(), toolset_env.clone())) + /// Dependency-aware execution using Kahn's algorithm + async fn run_with_deps( + &self, + to_run: Vec, + satisfied_ids: &HashSet, + toolset_env: &BTreeMap, + ) -> Result> { + let mpr = MultiProgressReport::get(); + let mut results: Vec = vec![]; + let mut errors: Vec<(String, String)> = vec![]; + + // Build jobs map for lookup + let running_ids: HashSet = to_run.iter().map(|j| j.id.clone()).collect(); + let mut jobs: HashMap = HashMap::new(); + let mut dep_specs: Vec<(String, Vec)> = vec![]; + + for job in to_run { + // Filter depends to only those that are actually running (not fresh/skipped) + let filtered_deps: Vec = job + .depends + .iter() + .filter(|dep| { + if satisfied_ids.contains(*dep) { + // Dependency is already satisfied (fresh/skipped) + false + } else if running_ids.contains(*dep) { + // Dependency is in the run set — need to wait + true + } else { + // Unknown dep — warn but don't block + warn!( + "prepare provider '{}' depends on '{}' which is not configured", + job.id, dep + ); + false + } + }) + .cloned() .collect(); - let run_results = - parallel::parallel(to_run_with_context, |(job, mpr, toolset_env)| async move { - let pr = mpr.add(&job.cmd.description); - match Self::execute_prepare_static(&job.cmd, &toolset_env) { - Ok(()) => { - if job.touch { - Self::touch_outputs(&job.outputs); + dep_specs.push((job.id.clone(), filtered_deps)); + jobs.insert(job.id.clone(), job); + } + + let mut deps = PrepareDeps::new(&dep_specs)?; + + // Report blocked providers (cycles) + for blocked_id in deps.blocked_providers() { + warn!( + "prepare provider '{}' is blocked due to dependency cycle", + blocked_id + ); + if let Some(job) = jobs.remove(&blocked_id) { + results.push((PrepareStepResult::Skipped(job.id), vec![])); + } + } + + let mut rx = deps.subscribe(); + let semaphore = Arc::new(Semaphore::new(Settings::get().jobs)); + let mut join_set: JoinSet = JoinSet::new(); + // Track which tokio task ID maps to which provider ID for JoinError recovery + let mut inflight: HashMap = HashMap::new(); + + loop { + tokio::select! { + biased; + + // Prioritize handling completed tasks + Some(join_result) = join_set.join_next() => { + match join_result { + Ok(Ok((id, step_result, outputs))) => { + inflight.retain(|_, v| v != &id); + results.push((step_result, outputs)); + deps.complete_success(&id); + } + Ok(Err((id, e))) => { + inflight.retain(|_, v| v != &id); + warn!("prepare provider '{}' failed: {}", id, e); + errors.push((id.clone(), e.to_string())); + results.push((PrepareStepResult::Failed(id.clone()), vec![])); + deps.complete_failure(&id); + for blocked_id in deps.blocked_providers() { + if let Some(job) = jobs.remove(&blocked_id) { + warn!( + "prepare provider '{}' skipped due to failed dependency", + job.id + ); + results.push((PrepareStepResult::Skipped(job.id), vec![])); + } } - pr.finish_with_message(format!("{} done", job.cmd.description)); - // Return outputs along with result so we can clear stale status - // after ALL providers complete successfully - Ok((PrepareStepResult::Ran(job.id), job.outputs)) } Err(e) => { - pr.finish_with_message(format!( - "{} failed: {}", - job.cmd.description, e - )); - Err(e) + // JoinError — task panicked or was cancelled + if let Some(id) = inflight.remove(&e.id()) { + warn!("prepare provider '{}' panicked: {}", id, e); + errors.push((id.clone(), e.to_string())); + results.push((PrepareStepResult::Failed(id.clone()), vec![])); + deps.complete_failure(&id); + for blocked_id in deps.blocked_providers() { + if let Some(job) = jobs.remove(&blocked_id) { + warn!( + "prepare provider '{}' skipped due to failed dependency", + job.id + ); + results.push((PrepareStepResult::Skipped(job.id), vec![])); + } + } + } else { + warn!("prepare task join error (unknown task): {e}"); + } } } - }) - .await?; + } - // All providers completed successfully - now clear stale status for all outputs - for (step_result, outputs) in run_results { - for output in &outputs { - super::clear_output_stale(output); + // Receive next ready provider + Some(maybe_id) = rx.recv() => { + let Some(id) = maybe_id else { + // None = all done + break; + }; + + let Some(job) = jobs.remove(&id) else { + continue; + }; + + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let mpr = mpr.clone(); + let toolset_env = toolset_env.clone(); + + let handle = join_set.spawn(async move { + let pr = mpr.add(&job.cmd.description); + let id = job.id; + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + Self::execute_prepare_static(&job.cmd, &toolset_env) + })); + drop(permit); + + match result { + Ok(Ok(())) => { + if job.touch { + Self::touch_outputs(&job.outputs); + } + pr.finish_with_message(format!("{} done", job.cmd.description)); + let step = PrepareStepResult::Ran(id.clone()); + Ok((id, step, job.outputs)) + } + Ok(Err(e)) => { + pr.finish_with_message(format!( + "{} failed: {}", + job.cmd.description, e + )); + Err((id, e)) + } + Err(_) => { + pr.finish_with_message(format!( + "{} panicked", + job.cmd.description + )); + Err((id, eyre::eyre!("task panicked"))) + } + } + }); + inflight.insert(handle.id(), id); } - results.push(step_result); + + else => break, } } - Ok(PrepareResult { steps: results }) + // Wait for remaining in-flight tasks + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(Ok((id, step_result, outputs))) => { + inflight.retain(|_, v| v != &id); + results.push((step_result, outputs)); + } + Ok(Err((id, e))) => { + inflight.retain(|_, v| v != &id); + warn!("prepare provider '{}' failed: {}", id, e); + errors.push((id.clone(), e.to_string())); + results.push((PrepareStepResult::Failed(id), vec![])); + } + Err(e) => { + if let Some(id) = inflight.remove(&e.id()) { + warn!("prepare provider '{}' panicked: {}", id, e); + errors.push((id.clone(), e.to_string())); + results.push((PrepareStepResult::Failed(id), vec![])); + } else { + warn!("prepare task join error (unknown task): {e}"); + } + } + } + } + + if !errors.is_empty() { + let details = errors + .iter() + .map(|(id, msg)| format!(" {id}: {msg}")) + .collect::>() + .join("\n"); + return Err(eyre::eyre!("prepare providers failed:\n{details}")); + } + Ok(results) } /// Check if outputs are newer than sources (stateless mtime comparison) diff --git a/src/prepare/mod.rs b/src/prepare/mod.rs index 385f988973..3df9ff689f 100644 --- a/src/prepare/mod.rs +++ b/src/prepare/mod.rs @@ -12,6 +12,7 @@ pub use engine::{PrepareEngine, PrepareOptions, PrepareStepResult}; pub use rule::PrepareConfig; mod engine; +pub(crate) mod prepare_deps; pub mod providers; mod rule; @@ -95,6 +96,11 @@ pub trait PrepareProvider: Debug + Send + Sync { fn touch_outputs(&self) -> bool { self.base().touch_outputs() } + + /// Other prepare providers that must complete before this one runs + fn depends(&self) -> Vec { + self.base().config.depends.clone() + } } /// Warn if any auto-enabled prepare providers are stale diff --git a/src/prepare/prepare_deps.rs b/src/prepare/prepare_deps.rs new file mode 100644 index 0000000000..e3283e10cf --- /dev/null +++ b/src/prepare/prepare_deps.rs @@ -0,0 +1,215 @@ +use eyre::{Result, bail}; +use tokio::sync::mpsc; + +use crate::deps_graph::DepsGraph; + +/// Manages a dependency graph of prepare providers for execution scheduling. +/// Thin wrapper around `DepsGraph` with prepare-specific +/// validation and error messages. +#[derive(Debug)] +pub struct PrepareDeps { + inner: DepsGraph, +} + +impl PrepareDeps { + /// Creates a new PrepareDeps from a list of (provider_id, depends) tuples. + pub fn new(providers: &[(String, Vec)]) -> Result { + // Validate that all deps reference known providers before building the graph + let known: std::collections::HashSet<&str> = + providers.iter().map(|(id, _)| id.as_str()).collect(); + for (id, deps) in providers { + for dep in deps { + if !known.contains(dep.as_str()) { + bail!( + "prepare provider '{}' depends on unknown provider '{}'", + id, + dep + ); + } + } + } + + let nodes: Vec<(String, String)> = providers + .iter() + .map(|(id, _)| (id.clone(), id.clone())) + .collect(); + + let edges: Vec<(String, String)> = providers + .iter() + .flat_map(|(id, deps)| deps.iter().map(move |dep| (id.clone(), dep.clone()))) + .collect(); + + let inner = DepsGraph::new(nodes, edges, |s: &String| s.clone())?; + Ok(Self { inner }) + } + + /// Subscribe to receive providers that are ready to run. + pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver> { + self.inner.subscribe() + } + + /// Mark a provider as successfully completed. + pub fn complete_success(&mut self, id: &str) { + self.inner.complete_success(&id.to_string()); + } + + /// Mark a provider as failed and block all transitive dependents. + pub fn complete_failure(&mut self, id: &str) { + self.inner.complete_failure(&id.to_string()); + } + + /// Returns the list of blocked provider IDs. + pub fn blocked_providers(&self) -> Vec { + self.inner.blocked_keys() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_graph() { + let _deps = PrepareDeps::new(&[]).unwrap(); + } + + #[test] + fn test_no_deps_all_ready() { + let providers = vec![ + ("npm".to_string(), vec![]), + ("pip".to_string(), vec![]), + ("go".to_string(), vec![]), + ]; + let mut deps = PrepareDeps::new(&providers).unwrap(); + let mut rx = deps.subscribe(); + + let mut emitted = vec![]; + while let Ok(Some(id)) = rx.try_recv() { + emitted.push(id); + } + assert_eq!(emitted.len(), 3); + assert!(emitted.contains(&"npm".to_string())); + assert!(emitted.contains(&"pip".to_string())); + assert!(emitted.contains(&"go".to_string())); + } + + #[test] + fn test_linear_ordering() { + let providers = vec![ + ("a".to_string(), vec![]), + ("b".to_string(), vec!["a".to_string()]), + ("c".to_string(), vec!["b".to_string()]), + ]; + let mut deps = PrepareDeps::new(&providers).unwrap(); + let mut rx = deps.subscribe(); + + let first = rx.try_recv().unwrap().unwrap(); + assert_eq!(first, "a"); + assert!(rx.try_recv().is_err()); + + deps.complete_success("a"); + let second = rx.try_recv().unwrap().unwrap(); + assert_eq!(second, "b"); + + deps.complete_success("b"); + let third = rx.try_recv().unwrap().unwrap(); + assert_eq!(third, "c"); + + deps.complete_success("c"); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_failure_blocks_dependents() { + let providers = vec![ + ("a".to_string(), vec![]), + ("b".to_string(), vec!["a".to_string()]), + ("c".to_string(), vec!["b".to_string()]), + ("d".to_string(), vec![]), + ]; + let mut deps = PrepareDeps::new(&providers).unwrap(); + let mut rx = deps.subscribe(); + + let mut initial = vec![]; + while let Ok(Some(id)) = rx.try_recv() { + initial.push(id); + } + assert_eq!(initial.len(), 2); + assert!(initial.contains(&"a".to_string())); + assert!(initial.contains(&"d".to_string())); + + deps.complete_failure("a"); + let blocked = deps.blocked_providers(); + assert!(blocked.contains(&"b".to_string())); + assert!(blocked.contains(&"c".to_string())); + + deps.complete_success("d"); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_cycle_detection() { + let providers = vec![ + ("a".to_string(), vec!["b".to_string()]), + ("b".to_string(), vec!["a".to_string()]), + ("c".to_string(), vec![]), + ]; + let mut deps = PrepareDeps::new(&providers).unwrap(); + + let blocked = deps.blocked_providers(); + assert!(blocked.contains(&"a".to_string())); + assert!(blocked.contains(&"b".to_string())); + + let mut rx = deps.subscribe(); + let first = rx.try_recv().unwrap().unwrap(); + assert_eq!(first, "c"); + + deps.complete_success("c"); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } + + #[test] + fn test_unknown_dep_error() { + let providers = vec![("a".to_string(), vec!["nonexistent".to_string()])]; + let result = PrepareDeps::new(&providers); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("unknown provider")); + } + + #[test] + fn test_diamond_deps() { + let providers = vec![ + ("d".to_string(), vec![]), + ("b".to_string(), vec!["d".to_string()]), + ("c".to_string(), vec!["d".to_string()]), + ("a".to_string(), vec!["b".to_string(), "c".to_string()]), + ]; + let mut deps = PrepareDeps::new(&providers).unwrap(); + let mut rx = deps.subscribe(); + + let first = rx.try_recv().unwrap().unwrap(); + assert_eq!(first, "d"); + assert!(rx.try_recv().is_err()); + + deps.complete_success("d"); + let mut mid = vec![]; + while let Ok(Some(id)) = rx.try_recv() { + mid.push(id); + } + assert_eq!(mid.len(), 2); + assert!(mid.contains(&"b".to_string())); + assert!(mid.contains(&"c".to_string())); + + deps.complete_success("b"); + deps.complete_success("c"); + let last = rx.try_recv().unwrap().unwrap(); + assert_eq!(last, "a"); + + deps.complete_success("a"); + let done = rx.try_recv().unwrap(); + assert!(done.is_none()); + } +} diff --git a/src/prepare/rule.rs b/src/prepare/rule.rs index bb1c0e5cc7..f2baf79427 100644 --- a/src/prepare/rule.rs +++ b/src/prepare/rule.rs @@ -46,6 +46,9 @@ pub struct PrepareProviderConfig { /// This is useful when the prepare command is a no-op (e.g., `uv sync` when all is well) /// so that the outputs appear fresh for subsequent freshness checks. pub touch_outputs: Option, + /// Other prepare providers that must complete before this one runs + #[serde(default)] + pub depends: Vec, } impl PrepareProviderConfig { diff --git a/src/toolset/tool_deps.rs b/src/toolset/tool_deps.rs index a0718d7fbc..53e3de720b 100644 --- a/src/toolset/tool_deps.rs +++ b/src/toolset/tool_deps.rs @@ -1,12 +1,9 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use eyre::Result; -use indexmap::IndexSet; -use petgraph::Direction; -use petgraph::algo::is_cyclic_directed; -use petgraph::stable_graph::{NodeIndex, StableGraph}; use tokio::sync::mpsc; +use crate::deps_graph::DepsGraph; use crate::toolset::tool_request::ToolRequest; /// Unique key for a tool request (backend full name + version) @@ -18,24 +15,11 @@ fn tool_key(tr: &ToolRequest) -> ToolKey { } /// Manages a dependency graph of tools for installation scheduling. -/// Uses Kahn's algorithm to emit tools that are ready to install -/// (i.e., all their dependencies have been installed). +/// Thin wrapper around `DepsGraph` with +/// tool-specific dependency resolution. #[derive(Debug)] pub struct ToolDeps { - /// The dependency graph where edges point from a tool to its dependencies - /// (i.e., edge A→B means "A depends on B", so B must be installed first). - /// Uses StableGraph to maintain valid node indices after removals. - graph: StableGraph, - /// Maps tool keys to their node indices in the graph - node_indices: HashMap, - /// Tools that have already been sent for installation - sent: HashSet, - /// Tools that are blocked due to dependency failures or cycles - blocked: HashSet, - /// Channel sender for emitting ready tools (None signals completion). - /// Initially created with a dummy receiver that is dropped; the real - /// receiver is created when `subscribe()` is called. - tx: mpsc::UnboundedSender>, + inner: DepsGraph, } impl ToolDeps { @@ -43,50 +27,33 @@ impl ToolDeps { /// Builds the dependency graph based on each tool's dependencies. /// Duplicate tool requests (same backend and version) are deduplicated. pub fn new(requests: Vec) -> Result { - let mut graph = StableGraph::new(); - let mut node_indices = HashMap::new(); - - // First pass: add all requested tools to the graph, deduplicating by key - for tr in &requests { - let key = tool_key(tr); - // Skip duplicates - only add the first occurrence - if node_indices.contains_key(&key) { - continue; - } - let idx = graph.add_node(tr.clone()); - node_indices.insert(key, idx); - } + // Build nodes + let nodes: Vec<(ToolKey, ToolRequest)> = requests + .iter() + .map(|tr| (tool_key(tr), tr.clone())) + .collect(); // Build a set of all tool identifiers being installed for dependency lookup let versions_hash: HashSet = requests.iter().flat_map(|tr| tr.ba().all_fulls()).collect(); - // Second pass: add edges for dependencies + // Compute edges from backend dependencies + let mut edges: Vec<(ToolKey, ToolKey)> = vec![]; for tr in &requests { let tr_key = tool_key(tr); - // Skip if this is a duplicate we didn't add - let Some(&tr_idx) = node_indices.get(&tr_key) else { - continue; - }; - // Get all dependencies for this tool if let Ok(backend) = tr.backend() && let Ok(deps) = backend.get_all_dependencies(true) { for dep_ba in deps { - // Check if this dependency is being installed let dep_fulls = dep_ba.all_fulls(); if dep_fulls.iter().any(|full| versions_hash.contains(full)) { - // Find the matching tool request in our set for other_tr in &requests { let other_fulls = other_tr.ba().all_fulls(); if dep_fulls.iter().any(|f| other_fulls.contains(f)) { let other_key = tool_key(other_tr); - if tr_key != other_key - && let Some(&other_idx) = node_indices.get(&other_key) - { - // Edge from tr to dep means "tr depends on dep" - graph.update_edge(tr_idx, other_idx, ()); + if tr_key != other_key { + edges.push((tr_key.clone(), other_key)); } } } @@ -95,216 +62,28 @@ impl ToolDeps { } } - // Create a dummy channel - the real one is created in subscribe() - let (tx, _) = mpsc::unbounded_channel(); - - let mut deps = Self { - graph, - node_indices, - sent: HashSet::new(), - blocked: HashSet::new(), - tx, - }; - - // Detect and block any cycles - deps.detect_and_block_cycles(); - - Ok(deps) + let inner = DepsGraph::new(nodes, edges, tool_key)?; + Ok(Self { inner }) } /// Subscribe to receive tools that are ready to install. - /// Returns a receiver that will emit Some(ToolRequest) for each ready tool, - /// followed by None when all tools have been processed. pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver> { - let (tx, rx) = mpsc::unbounded_channel(); - self.tx = tx; - self.emit_leaves(); - rx + self.inner.subscribe() } /// Mark a tool as successfully installed and emit any newly-ready tools. pub fn complete_success(&mut self, tr: &ToolRequest) { - let key = tool_key(tr); - self.remove_node(&key); - self.emit_leaves(); + self.inner.complete_success(&tool_key(tr)); } /// Mark a tool as failed and block all transitive dependents. pub fn complete_failure(&mut self, tr: &ToolRequest) { - let key = tool_key(tr); - - // Find and block all transitive dependents before removing the node - if let Some(&idx) = self.node_indices.get(&key) { - let dependents = self.get_transitive_dependents(idx); - for dep_idx in dependents { - if let Some(dep_tr) = self.graph.node_weight(dep_idx) { - let dep_key = tool_key(dep_tr); - self.blocked.insert(dep_key); - } - } - } - - self.remove_node(&key); - self.emit_leaves(); - } - - /// Returns whether all tools have been processed - pub fn is_empty(&self) -> bool { - self.graph.node_count() == 0 + self.inner.complete_failure(&tool_key(tr)); } /// Returns the list of blocked tools (those whose dependencies failed or are in cycles) pub fn blocked_tools(&self) -> Vec { - self.graph - .node_indices() - .filter_map(|idx| { - let tr = self.graph.node_weight(idx)?; - if self.blocked.contains(&tool_key(tr)) { - Some(tr.clone()) - } else { - None - } - }) - .collect() - } - - /// Detect cycles in the graph and mark all nodes in cycles as blocked - fn detect_and_block_cycles(&mut self) { - if !is_cyclic_directed(&self.graph) { - return; - } - - // Find all nodes that are part of cycles by checking which nodes - // have no path to a leaf (a node with out-degree 0) - let mut can_reach_leaf: HashSet = HashSet::new(); - - // Start with all leaf nodes - for idx in self.graph.node_indices() { - if self - .graph - .neighbors_directed(idx, Direction::Outgoing) - .next() - .is_none() - { - can_reach_leaf.insert(idx); - } - } - - // Propagate backwards: if a node points to a node that can reach a leaf, - // then it can also reach a leaf - let mut changed = true; - while changed { - changed = false; - for idx in self.graph.node_indices() { - if can_reach_leaf.contains(&idx) { - continue; - } - // Check if any dependency can reach a leaf - let deps_can_reach = self - .graph - .neighbors_directed(idx, Direction::Outgoing) - .all(|dep_idx| can_reach_leaf.contains(&dep_idx)); - if deps_can_reach - && self - .graph - .neighbors_directed(idx, Direction::Outgoing) - .next() - .is_some() - { - can_reach_leaf.insert(idx); - changed = true; - } - } - } - - // Any node that cannot reach a leaf is in a cycle - block it - for idx in self.graph.node_indices() { - if !can_reach_leaf.contains(&idx) - && let Some(tr) = self.graph.node_weight(idx) - { - let key = tool_key(tr); - self.blocked.insert(key); - } - } - } - - /// Emit all tools that have no remaining dependencies (leaf nodes) - fn emit_leaves(&mut self) { - let leaves = self.find_leaves(); - - for tr in leaves { - let key = tool_key(&tr); - - // Skip if already sent, blocked, or completed - if self.sent.contains(&key) || self.blocked.contains(&key) { - continue; - } - - if self.sent.insert(key) { - trace!("Scheduling tool install: {}", tr); - if let Err(e) = self.tx.send(Some(tr)) { - trace!("Error sending tool: {e:?}"); - } - } - } - - // Check if we're done - if self.is_all_done() { - trace!("All tool installations finished"); - if let Err(e) = self.tx.send(None) { - trace!("Error closing tool stream: {e:?}"); - } - } - } - - /// Find all leaf nodes (tools with no unsatisfied dependencies) - fn find_leaves(&self) -> Vec { - self.graph - .externals(Direction::Outgoing) - .filter_map(|idx| self.graph.node_weight(idx).cloned()) - .collect() - } - - /// Check if all tools have been processed (sent, completed, or blocked) - fn is_all_done(&self) -> bool { - // All done if graph is empty - if self.is_empty() { - return true; - } - - // Or if all remaining tools are blocked - self.graph.node_indices().all(|idx| { - self.graph - .node_weight(idx) - .map(|tr| self.blocked.contains(&tool_key(tr))) - .unwrap_or(true) - }) - } - - /// Remove a node from the graph by its key. - /// Uses StableGraph so other node indices remain valid. - fn remove_node(&mut self, key: &ToolKey) { - if let Some(&idx) = self.node_indices.get(key) { - self.graph.remove_node(idx); - self.node_indices.remove(key); - } - } - - /// Get all transitive dependents of a node (tools that depend on this one) - fn get_transitive_dependents(&self, start_idx: NodeIndex) -> IndexSet { - let mut dependents = IndexSet::new(); - let mut stack = vec![start_idx]; - - while let Some(idx) = stack.pop() { - // Find all nodes that have an edge TO this node (i.e., depend on it) - for neighbor in self.graph.neighbors_directed(idx, Direction::Incoming) { - if dependents.insert(neighbor) { - stack.push(neighbor); - } - } - } - - dependents + self.inner.blocked_nodes() } } @@ -314,20 +93,6 @@ mod tests { #[test] fn test_empty_deps() { - let deps = ToolDeps::new(vec![]).unwrap(); - assert!(deps.is_empty()); - } - - #[test] - fn test_find_leaves_empty_graph() { - let deps = ToolDeps::new(vec![]).unwrap(); - let leaves = deps.find_leaves(); - assert!(leaves.is_empty()); - } - - #[test] - fn test_is_all_done_empty() { - let deps = ToolDeps::new(vec![]).unwrap(); - assert!(deps.is_all_done()); + let _deps = ToolDeps::new(vec![]).unwrap(); } }