diff --git a/e2e/tasks/test_task_parallel_execution b/e2e/tasks/test_task_parallel_execution index b0f724bb84..1ec59d9517 100755 --- a/e2e/tasks/test_task_parallel_execution +++ b/e2e/tasks/test_task_parallel_execution @@ -10,6 +10,9 @@ temp_log=$(mktemp) trap 'rm -f $temp_log' EXIT cat <mise.toml +[tools] +poetry = "latest" + [tasks.process1] run = ''' echo "\$(date +%s%N) process1 start" >> $temp_log @@ -62,94 +65,106 @@ echo "process6 complete" run = "mise run process1 ::: process2 ::: process3 ::: process4 ::: process5 ::: process6" EOF -echo "Testing parallel task execution..." - -# Record start time for overall execution time measurement -start_time=$(date +%s) - -# Run the tasks and capture output -task_output=$(mise run run-all) - -# Record end time -end_time=$(date +%s) -total_time=$((end_time - start_time)) - -echo "Total execution time: ${total_time}s" - -# Read the log file to analyze timing -echo "Task execution log:" -cat "$temp_log" | sort - -# Parse start times to check if tasks started concurrently -mapfile -t start_times < <(grep "start" "$temp_log" | awk '{print $1}' | sort -n) -mapfile -t end_times < <(grep "end" "$temp_log" | awk '{print $1}' | sort -n) - -echo "Analyzing task start times..." - -if [ ${#start_times[@]} -ne 6 ]; then - echo "ERROR: Expected 6 task start times, got ${#start_times[@]}" - exit 1 -fi - -if [ ${#end_times[@]} -ne 6 ]; then - echo "ERROR: Expected 6 task end times, got ${#end_times[@]}" - exit 1 -fi - -# Calculate time differences between task starts (in nanoseconds) -# For parallel execution, all tasks should start within a small window (< 1 second) -first_start=${start_times[0]} -last_start=${start_times[5]} -start_spread=$((last_start - first_start)) - -# Convert nanoseconds to milliseconds for easier interpretation -start_spread_ms=$((start_spread / 1000000)) - -echo "Time between first and last task start: ${start_spread_ms}ms" - -# Check if tasks started in parallel (within 1 second = 1000ms) -if [ $start_spread_ms -gt 1000 ]; then - echo "WARNING: Tasks may not be running in parallel. Start spread: ${start_spread_ms}ms" - echo "This could indicate the original issue is still present." -else - echo "✓ Tasks started concurrently (within ${start_spread_ms}ms)" -fi - -# Critical check: Ensure all tasks start before any task completes -# This is essential for true parallel execution -echo "Checking that all tasks start before any complete..." -last_start_time=${start_times[5]} -first_end_time=${end_times[0]} - -if [ "$last_start_time" -gt "$first_end_time" ]; then - echo "ERROR: Some tasks completed before all tasks started!" - echo "Last task started at: $last_start_time" - echo "First task completed at: $first_end_time" - echo "This indicates sequential execution, not parallel execution." - exit 1 -else - echo "✓ All tasks started before any completed - confirming parallel execution" -fi - -# Check total execution time -# If running sequentially: 6 tasks * 2 seconds = ~12 seconds -# If running in parallel: ~2-3 seconds (plus overhead) -if [ $total_time -gt 8 ]; then - echo "ERROR: Total execution time too long (${total_time}s). Tasks appear to be running sequentially." - echo "Expected: ~2-4 seconds for parallel execution" - echo "Got: ${total_time}s (suggests sequential execution)" - exit 1 -else - echo "✓ Total execution time acceptable (${total_time}s) - suggests parallel execution" -fi - -# Verify all tasks completed successfully using the captured output -for i in {1..6}; do - if ! grep -q "process${i} complete" <<<"$task_output"; then - echo "ERROR: process${i} did not complete successfully" +assert "mise install" + +echo "Testing parallel task execution (running 3 times for reliability)..." + +for test_run in {1..3}; do + echo "" + echo "=== Test Run $test_run/3 ===" + + # Clear the log file for each run + rm -f "$temp_log" + + # Record start time for overall execution time measurement + start_time=$(date +%s) + + # Run the tasks and capture output + task_output=$(mise run run-all) + + # Record end time + end_time=$(date +%s) + total_time=$((end_time - start_time)) + + echo "Total execution time: ${total_time}s" + + # Read the log file to analyze timing + echo "Task execution log:" + cat "$temp_log" | sort + + # Parse start times to check if tasks started concurrently + mapfile -t start_times < <(grep "start" "$temp_log" | awk '{print $1}' | sort -n) + mapfile -t end_times < <(grep "end" "$temp_log" | awk '{print $1}' | sort -n) + + echo "Analyzing task start times..." + + if [ ${#start_times[@]} -ne 6 ]; then + echo "ERROR: Expected 6 task start times, got ${#start_times[@]}" + exit 1 + fi + + if [ ${#end_times[@]} -ne 6 ]; then + echo "ERROR: Expected 6 task end times, got ${#end_times[@]}" + exit 1 + fi + + # Calculate time differences between task starts (in nanoseconds) + # For parallel execution, all tasks should start within a small window (< 1 second) + first_start=${start_times[0]} + last_start=${start_times[5]} + start_spread=$((last_start - first_start)) + + # Convert nanoseconds to milliseconds for easier interpretation + start_spread_ms=$((start_spread / 1000000)) + + echo "Time between first and last task start: ${start_spread_ms}ms" + + # Check if tasks started in parallel (within 1 second = 1000ms) + if [ $start_spread_ms -gt 1000 ]; then + echo "WARNING: Tasks may not be running in parallel. Start spread: ${start_spread_ms}ms" + echo "This could indicate the original issue is still present." + else + echo "✓ Tasks started concurrently (within ${start_spread_ms}ms)" + fi + + # Critical check: Ensure all tasks start before any task completes + # This is essential for true parallel execution + echo "Checking that all tasks start before any complete..." + last_start_time=${start_times[5]} + first_end_time=${end_times[0]} + + if [ "$last_start_time" -gt "$first_end_time" ]; then + echo "ERROR: Some tasks completed before all tasks started!" + echo "Last task started at: $last_start_time" + echo "First task completed at: $first_end_time" + echo "This indicates sequential execution, not parallel execution." exit 1 + else + echo "✓ All tasks started before any completed - confirming parallel execution" fi + + # Check total execution time + # If running sequentially: 6 tasks * 2 seconds = ~12 seconds + # If running in parallel: ~2-3 seconds (plus overhead) + if [ $total_time -gt 8 ]; then + echo "ERROR: Total execution time too long (${total_time}s). Tasks appear to be running sequentially." + echo "Expected: ~2-4 seconds for parallel execution" + echo "Got: ${total_time}s (suggests sequential execution)" + exit 1 + else + echo "✓ Total execution time acceptable (${total_time}s) - suggests parallel execution" + fi + + # Verify all tasks completed successfully using the captured output + for i in {1..6}; do + if ! grep -q "process${i} complete" <<<"$task_output"; then + echo "ERROR: process${i} did not complete successfully" + exit 1 + fi + done + + echo "✓ All tasks completed successfully in test run $test_run" done -echo "✓ All tasks completed successfully" -echo "✓ Parallel task execution test passed" +echo "" +echo "✓ All 3 test runs passed - Parallel task execution test passed" diff --git a/src/backend/asdf.rs b/src/backend/asdf.rs index 5c819e9568..fd0246bd17 100644 --- a/src/backend/asdf.rs +++ b/src/backend/asdf.rs @@ -387,6 +387,7 @@ impl Backend for AsdfBackend { ts: &Toolset, tv: &ToolVersion, ) -> eyre::Result { + let total_start = std::time::Instant::now(); if matches!(tv.request, ToolRequest::System { .. }) { return Ok(BTreeMap::new()); } @@ -395,11 +396,18 @@ impl Backend for AsdfBackend { // the second is to prevent infinite loops return Ok(BTreeMap::new()); } - self.cache + let res = self + .cache .exec_env(config, self, tv, async || { self.fetch_exec_env(config, ts, tv).await }) - .await + .await; + trace!( + "exec_env cache.get_or_try_init_async for {} finished in {}ms", + self.name, + total_start.elapsed().as_millis() + ); + res } } diff --git a/src/backend/external_plugin_cache.rs b/src/backend/external_plugin_cache.rs index e4317d30d7..64370f2b69 100644 --- a/src/backend/external_plugin_cache.rs +++ b/src/backend/external_plugin_cache.rs @@ -1,20 +1,19 @@ use crate::backend::asdf::AsdfBackend; use crate::cache::{CacheManager, CacheManagerBuilder}; use crate::config::Config; -use crate::dirs; use crate::env; use crate::env_diff::EnvMap; use crate::hash::hash_to_str; use crate::tera::{BASE_CONTEXT, get_tera}; use crate::toolset::{ToolRequest, ToolVersion}; +use dashmap::DashMap; use eyre::{WrapErr, eyre}; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock; +use std::sync::Arc; #[derive(Debug, Default)] pub struct ExternalPluginCache { - list_bin_paths: RwLock>>>, - exec_env: RwLock>>, + list_bin_paths: DashMap>>, + exec_env: DashMap>, } impl ExternalPluginCache { @@ -29,22 +28,31 @@ impl ExternalPluginCache { Fut: Future>>, F: FnOnce() -> Fut, { - let mut w = self.list_bin_paths.write().await; - let cm = w.entry(tv.request.clone()).or_insert_with(|| { - let list_bin_paths_filename = match &plugin.toml.list_bin_paths.cache_key { - Some(key) => { - let key = render_cache_key(config, tv, key); - let filename = format!("{key}.msgpack.z"); - tv.cache_path().join("list_bin_paths").join(filename) - } - None => tv.cache_path().join("list_bin_paths.msgpack.z"), - }; - CacheManagerBuilder::new(list_bin_paths_filename) - .with_fresh_file(plugin.plugin_path.clone()) - .with_fresh_file(tv.install_path()) - .build() - }); - cm.get_or_try_init_async(fetch).await.cloned() + let cm = self + .list_bin_paths + .entry(tv.request.clone()) + .or_insert_with(|| { + let list_bin_paths_filename = match &plugin.toml.list_bin_paths.cache_key { + Some(key) => { + let key = render_cache_key(config, tv, key); + let filename = format!("{key}.msgpack.z"); + tv.cache_path().join("list_bin_paths").join(filename) + } + None => tv.cache_path().join("list_bin_paths.msgpack.z"), + }; + CacheManagerBuilder::new(list_bin_paths_filename) + .with_fresh_file(plugin.plugin_path.clone()) + .with_fresh_file(tv.install_path()) + .build() + }); + let start = std::time::Instant::now(); + let res = cm.get_or_try_init_async(fetch).await.cloned(); + trace!( + "external_plugin_cache.list_bin_paths for {} took {}ms", + plugin.name, + start.elapsed().as_millis() + ); + res } pub async fn exec_env( @@ -58,8 +66,7 @@ impl ExternalPluginCache { Fut: Future>, F: FnOnce() -> Fut, { - let mut w = self.exec_env.write().await; - let cm = w.entry(tv.request.clone()).or_insert_with(|| { + let cm = self.exec_env.entry(tv.request.clone()).or_insert_with(|| { let exec_env_filename = match &plugin.toml.exec_env.cache_key { Some(key) => { let key = render_cache_key(config, tv, key); @@ -69,12 +76,18 @@ impl ExternalPluginCache { None => tv.cache_path().join("exec_env.msgpack.z"), }; CacheManagerBuilder::new(exec_env_filename) - .with_fresh_file(dirs::DATA.to_path_buf()) .with_fresh_file(plugin.plugin_path.clone()) .with_fresh_file(tv.install_path()) .build() }); - cm.get_or_try_init_async(fetch).await.cloned() + let start = std::time::Instant::now(); + let res = cm.get_or_try_init_async(fetch).await.cloned(); + trace!( + "external_plugin_cache.exec_env for {} took {}ms", + plugin.name, + start.elapsed().as_millis() + ); + res } } diff --git a/src/cli/run.rs b/src/cli/run.rs index 05b08dbaf1..362b0bba25 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -415,7 +415,13 @@ impl Run { } let jset = jset.clone(); let this_ = this.clone(); + let wait_start = std::time::Instant::now(); let permit = semaphore.clone().acquire_owned().await?; + trace!( + "semaphore acquired for {} after {}ms", + task.name, + wait_start.elapsed().as_millis() + ); let config = config.clone(); let sched_tx = sched_tx.clone(); in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -482,7 +488,13 @@ impl Run { if this.is_stopping() { break; } let jset = jset.clone(); let this_ = this.clone(); + let wait_start = std::time::Instant::now(); let permit = semaphore.clone().acquire_owned().await?; + trace!( + "semaphore acquired for {} after {}ms", + task.name, + wait_start.elapsed().as_millis() + ); let config = config.clone(); let sched_tx = sched_tx.clone(); in_flight.fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -592,6 +604,7 @@ impl Run { sched_tx: Arc>)>>, ) -> Result<()> { let prefix = task.estyled_prefix(); + let total_start = std::time::Instant::now(); if Settings::get().task_skip.contains(&task.name) { if !self.quiet(Some(task)) { self.eprint(task, &prefix, "skipping task"); @@ -614,11 +627,23 @@ impl Run { for (k, v) in &task.tools { tools.push(format!("{k}@{v}").parse()?); } + let ts_build_start = std::time::Instant::now(); let ts = ToolsetBuilder::new() .with_args(&tools) .build(config) .await?; + trace!( + "task {} ToolsetBuilder::build took {}ms", + task.name, + ts_build_start.elapsed().as_millis() + ); + let env_render_start = std::time::Instant::now(); let mut env = task.render_env(config, &ts).await?; + trace!( + "task {} render_env took {}ms", + task.name, + env_render_start.elapsed().as_millis() + ); let output = self.output(Some(task)); env.insert("MISE_TASK_OUTPUT".into(), output.to_string()); if !self.timings { @@ -642,7 +667,14 @@ impl Run { let timer = std::time::Instant::now(); if let Some(file) = &task.file { + let exec_start = std::time::Instant::now(); self.exec_file(config, file, task, &env, &prefix).await?; + trace!( + "task {} exec_file took {}ms (total {}ms)", + task.name, + exec_start.elapsed().as_millis(), + total_start.elapsed().as_millis() + ); } else { let rendered_run_scripts = task .render_run_scripts_with_args(config, self.cd.clone(), &task.args, &env) @@ -658,8 +690,15 @@ impl Run { self.parse_usage_spec_and_init_env(config, task, &mut env, get_args) .await?; + let exec_start = std::time::Instant::now(); self.exec_task_run_entries(config, task, &env, &prefix, rendered_run_scripts, sched_tx) .await?; + trace!( + "task {} exec_task_run_entries took {}ms (total {}ms)", + task.name, + exec_start.elapsed().as_millis(), + total_start.elapsed().as_millis() + ); } if self.task_timings() diff --git a/src/toolset/mod.rs b/src/toolset/mod.rs index a8afee63a5..4e1e9621b3 100644 --- a/src/toolset/mod.rs +++ b/src/toolset/mod.rs @@ -20,11 +20,13 @@ use crate::{backend, config, env, hooks}; use crate::{backend::Backend, parallel}; pub use builder::ToolsetBuilder; use console::truncate_str; +use dashmap::DashMap; use eyre::{Result, WrapErr}; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; use outdated_info::OutdatedInfo; pub use outdated_info::is_outdated_version; +use std::sync::LazyLock as Lazy; use tokio::sync::OnceCell; use tokio::{sync::Semaphore, task::JoinSet}; pub use tool_request::ToolRequest; @@ -45,6 +47,10 @@ mod tool_version_options; pub use tool_version_options::{ToolVersionOptions, parse_tool_options}; +// Cache Toolset::list_paths results across identical toolsets within a process. +// Keyed by project_root plus sorted list of backend@version pairs currently installed. +static LIST_PATHS_CACHE: Lazy>> = Lazy::new(DashMap::new); + #[derive(Debug, Clone)] pub struct InstallOptions { pub reason: String, @@ -748,14 +754,40 @@ impl Toolset { Ok((env, env_results)) } pub async fn list_paths(&self, config: &Arc) -> Vec { - let mut paths = vec![]; + // Build a stable cache key based on project_root and current installed versions + let mut key_parts = vec![]; + if let Some(root) = &config.project_root { + key_parts.push(root.to_string_lossy().to_string()); + } + let mut installed: Vec = self + .list_current_installed_versions(config) + .into_iter() + .map(|(p, tv)| format!("{}@{}", p.id(), tv.version)) + .collect(); + installed.sort(); + key_parts.extend(installed); + let cache_key = key_parts.join("|"); + if let Some(entry) = LIST_PATHS_CACHE.get(&cache_key) { + trace!("toolset.list_paths hit cache"); + return entry.clone(); + } + + let mut paths: Vec = Vec::new(); for (p, tv) in self.list_current_installed_versions(config).into_iter() { - paths.extend(p.list_bin_paths(config, &tv).await.unwrap_or_else(|e| { + let start = std::time::Instant::now(); + let new_paths = p.list_bin_paths(config, &tv).await.unwrap_or_else(|e| { warn!("Error listing bin paths for {tv}: {e:#}"); Vec::new() - })); + }); + trace!( + "toolset.list_paths {}@{} list_bin_paths took {}ms", + p.id(), + tv.version, + start.elapsed().as_millis() + ); + paths.extend(new_paths); } - + LIST_PATHS_CACHE.insert(cache_key, paths.clone()); paths .into_iter() .filter(|p| p.parent().is_some()) // TODO: why?