Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 102 additions & 87 deletions e2e/tasks/test_task_parallel_execution
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ temp_log=$(mktemp)
trap 'rm -f $temp_log' EXIT

cat <<EOF >mise.toml
[tools]
poetry = "latest"

[tasks.process1]
run = '''
echo "\$(date +%s%N) process1 start" >> $temp_log
Expand Down Expand Up @@ -62,94 +65,106 @@ echo "process6 complete"
run = "mise run process1 ::: process2 ::: process3 ::: process4 ::: process5 ::: process6"
EOF

echo "Testing parallel task execution..."

# Record start time for overall execution time measurement
start_time=$(date +%s)

# Run the tasks and capture output
task_output=$(mise run run-all)

# Record end time
end_time=$(date +%s)
total_time=$((end_time - start_time))

echo "Total execution time: ${total_time}s"

# Read the log file to analyze timing
echo "Task execution log:"
cat "$temp_log" | sort

# Parse start times to check if tasks started concurrently
mapfile -t start_times < <(grep "start" "$temp_log" | awk '{print $1}' | sort -n)
mapfile -t end_times < <(grep "end" "$temp_log" | awk '{print $1}' | sort -n)

echo "Analyzing task start times..."

if [ ${#start_times[@]} -ne 6 ]; then
echo "ERROR: Expected 6 task start times, got ${#start_times[@]}"
exit 1
fi

if [ ${#end_times[@]} -ne 6 ]; then
echo "ERROR: Expected 6 task end times, got ${#end_times[@]}"
exit 1
fi

# Calculate time differences between task starts (in nanoseconds)
# For parallel execution, all tasks should start within a small window (< 1 second)
first_start=${start_times[0]}
last_start=${start_times[5]}
start_spread=$((last_start - first_start))

# Convert nanoseconds to milliseconds for easier interpretation
start_spread_ms=$((start_spread / 1000000))

echo "Time between first and last task start: ${start_spread_ms}ms"

# Check if tasks started in parallel (within 1 second = 1000ms)
if [ $start_spread_ms -gt 1000 ]; then
echo "WARNING: Tasks may not be running in parallel. Start spread: ${start_spread_ms}ms"
echo "This could indicate the original issue is still present."
else
echo "✓ Tasks started concurrently (within ${start_spread_ms}ms)"
fi

# Critical check: Ensure all tasks start before any task completes
# This is essential for true parallel execution
echo "Checking that all tasks start before any complete..."
last_start_time=${start_times[5]}
first_end_time=${end_times[0]}

if [ "$last_start_time" -gt "$first_end_time" ]; then
echo "ERROR: Some tasks completed before all tasks started!"
echo "Last task started at: $last_start_time"
echo "First task completed at: $first_end_time"
echo "This indicates sequential execution, not parallel execution."
exit 1
else
echo "✓ All tasks started before any completed - confirming parallel execution"
fi

# Check total execution time
# If running sequentially: 6 tasks * 2 seconds = ~12 seconds
# If running in parallel: ~2-3 seconds (plus overhead)
if [ $total_time -gt 8 ]; then
echo "ERROR: Total execution time too long (${total_time}s). Tasks appear to be running sequentially."
echo "Expected: ~2-4 seconds for parallel execution"
echo "Got: ${total_time}s (suggests sequential execution)"
exit 1
else
echo "✓ Total execution time acceptable (${total_time}s) - suggests parallel execution"
fi

# Verify all tasks completed successfully using the captured output
for i in {1..6}; do
if ! grep -q "process${i} complete" <<<"$task_output"; then
echo "ERROR: process${i} did not complete successfully"
assert "mise install"

echo "Testing parallel task execution (running 3 times for reliability)..."

for test_run in {1..3}; do
echo ""
echo "=== Test Run $test_run/3 ==="

# Clear the log file for each run
rm -f "$temp_log"

# Record start time for overall execution time measurement
start_time=$(date +%s)

# Run the tasks and capture output
task_output=$(mise run run-all)

# Record end time
end_time=$(date +%s)
total_time=$((end_time - start_time))

echo "Total execution time: ${total_time}s"

# Read the log file to analyze timing
echo "Task execution log:"
cat "$temp_log" | sort

# Parse start times to check if tasks started concurrently
mapfile -t start_times < <(grep "start" "$temp_log" | awk '{print $1}' | sort -n)
mapfile -t end_times < <(grep "end" "$temp_log" | awk '{print $1}' | sort -n)

echo "Analyzing task start times..."

if [ ${#start_times[@]} -ne 6 ]; then
echo "ERROR: Expected 6 task start times, got ${#start_times[@]}"
exit 1
fi

if [ ${#end_times[@]} -ne 6 ]; then
echo "ERROR: Expected 6 task end times, got ${#end_times[@]}"
exit 1
fi

# Calculate time differences between task starts (in nanoseconds)
# For parallel execution, all tasks should start within a small window (< 1 second)
first_start=${start_times[0]}
last_start=${start_times[5]}
start_spread=$((last_start - first_start))

# Convert nanoseconds to milliseconds for easier interpretation
start_spread_ms=$((start_spread / 1000000))

echo "Time between first and last task start: ${start_spread_ms}ms"

# Check if tasks started in parallel (within 1 second = 1000ms)
if [ $start_spread_ms -gt 1000 ]; then
echo "WARNING: Tasks may not be running in parallel. Start spread: ${start_spread_ms}ms"
echo "This could indicate the original issue is still present."
else
echo "✓ Tasks started concurrently (within ${start_spread_ms}ms)"
fi

# Critical check: Ensure all tasks start before any task completes
# This is essential for true parallel execution
echo "Checking that all tasks start before any complete..."
last_start_time=${start_times[5]}
first_end_time=${end_times[0]}

if [ "$last_start_time" -gt "$first_end_time" ]; then
echo "ERROR: Some tasks completed before all tasks started!"
echo "Last task started at: $last_start_time"
echo "First task completed at: $first_end_time"
echo "This indicates sequential execution, not parallel execution."
exit 1
else
echo "✓ All tasks started before any completed - confirming parallel execution"
fi

# Check total execution time
# If running sequentially: 6 tasks * 2 seconds = ~12 seconds
# If running in parallel: ~2-3 seconds (plus overhead)
if [ $total_time -gt 8 ]; then
echo "ERROR: Total execution time too long (${total_time}s). Tasks appear to be running sequentially."
echo "Expected: ~2-4 seconds for parallel execution"
echo "Got: ${total_time}s (suggests sequential execution)"
exit 1
else
echo "✓ Total execution time acceptable (${total_time}s) - suggests parallel execution"
fi

# Verify all tasks completed successfully using the captured output
for i in {1..6}; do
if ! grep -q "process${i} complete" <<<"$task_output"; then
echo "ERROR: process${i} did not complete successfully"
exit 1
fi
done

echo "✓ All tasks completed successfully in test run $test_run"
done

echo "✓ All tasks completed successfully"
echo "✓ Parallel task execution test passed"
echo ""
echo "✓ All 3 test runs passed - Parallel task execution test passed"
12 changes: 10 additions & 2 deletions src/backend/asdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ impl Backend for AsdfBackend {
ts: &Toolset,
tv: &ToolVersion,
) -> eyre::Result<EnvMap> {
let total_start = std::time::Instant::now();
if matches!(tv.request, ToolRequest::System { .. }) {
return Ok(BTreeMap::new());
}
Expand All @@ -395,11 +396,18 @@ impl Backend for AsdfBackend {
// the second is to prevent infinite loops
return Ok(BTreeMap::new());
}
self.cache
let res = self
.cache
.exec_env(config, self, tv, async || {
self.fetch_exec_env(config, ts, tv).await
})
.await
.await;
trace!(
"exec_env cache.get_or_try_init_async for {} finished in {}ms",
self.name,
total_start.elapsed().as_millis()
);
res
}
}

Expand Down
63 changes: 38 additions & 25 deletions src/backend/external_plugin_cache.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use crate::backend::asdf::AsdfBackend;
use crate::cache::{CacheManager, CacheManagerBuilder};
use crate::config::Config;
use crate::dirs;
use crate::env;
use crate::env_diff::EnvMap;
use crate::hash::hash_to_str;
use crate::tera::{BASE_CONTEXT, get_tera};
use crate::toolset::{ToolRequest, ToolVersion};
use dashmap::DashMap;
use eyre::{WrapErr, eyre};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use std::sync::Arc;

#[derive(Debug, Default)]
pub struct ExternalPluginCache {
list_bin_paths: RwLock<HashMap<ToolRequest, CacheManager<Vec<String>>>>,
exec_env: RwLock<HashMap<ToolRequest, CacheManager<EnvMap>>>,
list_bin_paths: DashMap<ToolRequest, CacheManager<Vec<String>>>,
exec_env: DashMap<ToolRequest, CacheManager<EnvMap>>,
}

impl ExternalPluginCache {
Expand All @@ -29,22 +28,31 @@ impl ExternalPluginCache {
Fut: Future<Output = eyre::Result<Vec<String>>>,
F: FnOnce() -> Fut,
{
let mut w = self.list_bin_paths.write().await;
let cm = w.entry(tv.request.clone()).or_insert_with(|| {
let list_bin_paths_filename = match &plugin.toml.list_bin_paths.cache_key {
Some(key) => {
let key = render_cache_key(config, tv, key);
let filename = format!("{key}.msgpack.z");
tv.cache_path().join("list_bin_paths").join(filename)
}
None => tv.cache_path().join("list_bin_paths.msgpack.z"),
};
CacheManagerBuilder::new(list_bin_paths_filename)
.with_fresh_file(plugin.plugin_path.clone())
.with_fresh_file(tv.install_path())
.build()
});
cm.get_or_try_init_async(fetch).await.cloned()
let cm = self
.list_bin_paths
.entry(tv.request.clone())
.or_insert_with(|| {
let list_bin_paths_filename = match &plugin.toml.list_bin_paths.cache_key {
Some(key) => {
let key = render_cache_key(config, tv, key);
let filename = format!("{key}.msgpack.z");
tv.cache_path().join("list_bin_paths").join(filename)
}
None => tv.cache_path().join("list_bin_paths.msgpack.z"),
};
CacheManagerBuilder::new(list_bin_paths_filename)
.with_fresh_file(plugin.plugin_path.clone())
.with_fresh_file(tv.install_path())
.build()
});
let start = std::time::Instant::now();
let res = cm.get_or_try_init_async(fetch).await.cloned();
trace!(
"external_plugin_cache.list_bin_paths for {} took {}ms",
plugin.name,
start.elapsed().as_millis()
);
res
}

pub async fn exec_env<F, Fut>(
Expand All @@ -58,8 +66,7 @@ impl ExternalPluginCache {
Fut: Future<Output = eyre::Result<EnvMap>>,
F: FnOnce() -> Fut,
{
let mut w = self.exec_env.write().await;
let cm = w.entry(tv.request.clone()).or_insert_with(|| {
let cm = self.exec_env.entry(tv.request.clone()).or_insert_with(|| {
let exec_env_filename = match &plugin.toml.exec_env.cache_key {
Some(key) => {
let key = render_cache_key(config, tv, key);
Expand All @@ -69,12 +76,18 @@ impl ExternalPluginCache {
None => tv.cache_path().join("exec_env.msgpack.z"),
};
CacheManagerBuilder::new(exec_env_filename)
.with_fresh_file(dirs::DATA.to_path_buf())
.with_fresh_file(plugin.plugin_path.clone())
.with_fresh_file(tv.install_path())
.build()
});
cm.get_or_try_init_async(fetch).await.cloned()
let start = std::time::Instant::now();
let res = cm.get_or_try_init_async(fetch).await.cloned();
trace!(
"external_plugin_cache.exec_env for {} took {}ms",
plugin.name,
start.elapsed().as_millis()
);
res
}
}

Expand Down
Loading
Loading