From 8320bf9b25b527bec018b3cba0e86f6e288eb090 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Tue, 1 Jul 2025 14:09:55 +1000 Subject: [PATCH 01/14] parallel execution --- crates/goose/src/agents/agent.rs | 4 + crates/goose/src/agents/mod.rs | 1 + .../parallel_execution_tool/executor.rs | 78 +++++++++++++ .../src/agents/parallel_execution_tool/lib.rs | 26 +++++ .../src/agents/parallel_execution_tool/mod.rs | 6 + .../parallel_run_task_tool.rs | 55 +++++++++ .../agents/parallel_execution_tool/tasks.rs | 110 ++++++++++++++++++ .../agents/parallel_execution_tool/types.rs | 63 ++++++++++ .../agents/parallel_execution_tool/workers.rs | 96 +++++++++++++++ 9 files changed, 439 insertions(+) create mode 100644 crates/goose/src/agents/parallel_execution_tool/executor.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/lib.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/mod.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/tasks.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/types.rs create mode 100644 crates/goose/src/agents/parallel_execution_tool/workers.rs diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index c2a5dbdb30ba..dcaff2d55ac3 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -10,6 +10,7 @@ use futures_util::stream; use futures_util::stream::StreamExt; use mcp_core::protocol::JsonRpcMessage; +use crate::agents::parallel_execution_tool::parallel_run_task_tool::{self, PARALLEL_RUN_TASK_TOOL_NAME_PREFIX}; use crate::agents::sub_recipe_manager::SubRecipeManager; use crate::config::{Config, ExtensionConfigManager, PermissionManager}; use crate::message::Message; @@ -265,6 +266,8 @@ impl Agent { sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await + } else if tool_call.name == PARALLEL_RUN_TASK_TOOL_NAME_PREFIX { + parallel_run_task_tool::run_tasks(tool_call.arguments.clone()).await } else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME { // Check if the tool is read_resource and handle it separately ToolCallResult::from( @@ -545,6 +548,7 @@ impl Agent { let sub_recipe_manager = self.sub_recipe_manager.lock().await; prefixed_tools.extend(sub_recipe_manager.sub_recipe_tools.values().cloned()); } + prefixed_tools.push(parallel_run_task_tool::create_parallel_run_task_tool()); prefixed_tools } diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index 6b4a6e9f3e47..d793651e3170 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -3,6 +3,7 @@ mod context; pub mod extension; pub mod extension_manager; mod large_response_handler; +pub mod parallel_execution_tool; pub mod platform_tools; pub mod prompt_manager; mod recipe_tools; diff --git a/crates/goose/src/agents/parallel_execution_tool/executor.rs b/crates/goose/src/agents/parallel_execution_tool/executor.rs new file mode 100644 index 000000000000..f2f67ba0d1dd --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/executor.rs @@ -0,0 +1,78 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, AtomicBool}; +use tokio::sync::mpsc; +use tokio::time::Instant; + +use crate::agents::parallel_execution_tool::lib::{Config, ExecutionResponse, ExecutionStats, Task, TaskResult}; +use crate::agents::parallel_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; + +// Main parallel execution function +pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResponse { + let start_time = Instant::now(); + let task_count = tasks.len(); + + // Create channels + let (task_tx, task_rx) = mpsc::channel::(task_count); + let (result_tx, mut result_rx) = mpsc::channel::(task_count); + + // Initialize shared state + let shared_state = Arc::new(SharedState { + task_sender: task_tx.clone(), + task_receiver: Arc::new(tokio::sync::Mutex::new(task_rx)), + result_sender: result_tx, + active_workers: Arc::new(AtomicUsize::new(0)), + should_stop: Arc::new(AtomicBool::new(false)), + total_tasks: Arc::new(AtomicUsize::new(task_count)), + completed_tasks: Arc::new(AtomicUsize::new(0)), + }); + + // Send all tasks to the queue + for task in tasks.clone() { + let _ = task_tx.send(task).await; + } + // Close sender so workers know when queue is empty + drop(task_tx); + + // Start initial workers + for i in 0..config.initial_workers { + spawn_worker(shared_state.clone(), i, config.timeout_seconds); + } + + // Start the scaler + let scaler_state = shared_state.clone(); + let scaler_handle = tokio::spawn(async move { + run_scaler(scaler_state, task_count, config.max_workers, config.timeout_seconds).await; + }); + + // Collect results + let mut results = Vec::new(); + while let Some(result) = result_rx.recv().await { + results.push(result); + if results.len() >= task_count { + break; + } + } + + // Wait for scaler to finish + let _ = scaler_handle.await; + + // Calculate stats + let execution_time = start_time.elapsed().as_millis(); + let completed = results.iter() + .filter(|r| r.status == "success") + .count(); + let failed = results.iter() + .filter(|r| r.status == "failed") + .count(); + + ExecutionResponse { + status: "completed".to_string(), + results, + stats: ExecutionStats { + total_tasks: task_count, + completed, + failed, + execution_time_ms: execution_time, + }, + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/lib.rs b/crates/goose/src/agents/parallel_execution_tool/lib.rs new file mode 100644 index 000000000000..1d3bcae793cb --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/lib.rs @@ -0,0 +1,26 @@ +pub use crate::agents::parallel_execution_tool::types::{Task, TaskResult, Config, ExecutionResponse, ExecutionStats}; +pub use crate::agents::parallel_execution_tool::executor::parallel_execute; + +use serde_json::Value; + +pub async fn llm_parallel_execute(input: Value) -> Result { + let tasks: Vec = serde_json::from_value( + input.get("tasks") + .ok_or("Missing tasks field")? + .clone() + ).map_err(|e| format!("Failed to parse tasks: {}", e))?; + + let config: Config = if let Some(config_value) = input.get("config") { + serde_json::from_value(config_value.clone()) + .map_err(|e| format!("Failed to parse config: {}", e))? + } else { + Config::default() + }; + + // Execute tasks + let response = parallel_execute(tasks, config).await; + + // Convert response to JSON + serde_json::to_value(response) + .map_err(|e| format!("Failed to serialize response: {}", e)) +} \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/mod.rs b/crates/goose/src/agents/parallel_execution_tool/mod.rs new file mode 100644 index 000000000000..2931f580e63f --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/mod.rs @@ -0,0 +1,6 @@ +mod lib; +mod types; +mod executor; +mod tasks; +mod workers; +pub mod parallel_run_task_tool; \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs new file mode 100644 index 000000000000..f819f980d103 --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs @@ -0,0 +1,55 @@ +use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; +use serde_json::Value; + +use crate::agents::{parallel_execution_tool::lib::llm_parallel_execute, tool_execution::ToolCallResult}; + +pub const PARALLEL_RUN_TASK_TOOL_NAME_PREFIX: &str = "parallel__run_task"; +pub fn create_parallel_run_task_tool() -> Tool { + Tool::new( + PARALLEL_RUN_TASK_TOOL_NAME_PREFIX, + "Run tasks in parallel", + serde_json::json!({ + "type": "object", + "properties": { + "tasks": { + "type": "array", + "items": { + "type": "string", + }, + "description": "The tasks to run in parallel" + }, + "config": { + "type": "object", + "properties": { + "timeout_seconds": { + "type": "number" + }, + "max_workers": { + "type": "number" + }, + "initial_workers": { + "type": "number" + } + } + } + } + }), + Some(ToolAnnotations { + title: Some("Run tasks in parallel".to_string()), + read_only_hint: false, + destructive_hint: true, + idempotent_hint: false, + open_world_hint: true, + }), + ) +} + +pub async fn run_tasks(execute_data: Value) -> ToolCallResult { + match llm_parallel_execute(execute_data).await { + Ok(result) => { + let output = serde_json::to_string(&result).unwrap(); + ToolCallResult::from(Ok(vec![Content::text(output)])) + }, + Err(e) => ToolCallResult::from(Err(ToolError::ExecutionError(e.to_string()))), + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/tasks.rs b/crates/goose/src/agents/parallel_execution_tool/tasks.rs new file mode 100644 index 000000000000..5000730c4d35 --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/tasks.rs @@ -0,0 +1,110 @@ +use tokio::time::{timeout, Duration}; +use serde_json::{json, Value}; +use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; + +// Process a single task based on its type +pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { + let task_clone = task.clone(); + let timeout_duration = Duration::from_secs(timeout_seconds); + + // Execute with timeout + match timeout(timeout_duration, execute_task(task_clone)).await { + Ok(Ok(data)) => TaskResult { + task_id: task.id.clone(), + status: "success".to_string(), + data: Some(data), + error: None, + }, + Ok(Err(error)) => TaskResult { + task_id: task.id.clone(), + status: "failed".to_string(), + data: None, + error: Some(error), + }, + Err(_) => TaskResult { + task_id: task.id.clone(), + status: "failed".to_string(), + data: None, + error: Some("Task timeout".to_string()), + }, + } +} + +async fn execute_task(task: Task) -> Result { + println!("Executing task: {:?}", task); + Ok(json!({ + "data": "my data", + })) +} + +// Compute task processor +async fn process_compute_task(task: &Task) -> Result { + // Simulate CPU-bound work + tokio::task::yield_now().await; + + if let Some(numbers) = task.payload.get("numbers").and_then(|v| v.as_array()) { + let sum: f64 = numbers + .iter() + .filter_map(|v| v.as_f64()) + .sum(); + + Ok(json!({ + "result": sum, + "operation": "sum" + })) + } else { + Err("Invalid compute task payload".to_string()) + } +} + +// Fetch task processor (simulated) +async fn process_fetch_task(task: &Task) -> Result { + if let Some(url) = task.payload.get("url").and_then(|v| v.as_str()) { + // Simulate network delay + tokio::time::sleep(Duration::from_millis(100)).await; + + // In real implementation, you would use reqwest here: + // let response = reqwest::get(url).await?; + + Ok(json!({ + "url": url, + "status": 200, + "data": "Simulated response data" + })) + } else { + Err("Invalid fetch task payload".to_string()) + } +} + +// Transform task processor +async fn process_transform_task(task: &Task) -> Result { + if let Some(data) = task.payload.get("data") { + if let Some(operation) = task.payload.get("operation").and_then(|v| v.as_str()) { + match operation { + "uppercase" => { + if let Some(text) = data.as_str() { + Ok(json!({ + "result": text.to_uppercase() + })) + } else { + Err("Data must be string for uppercase".to_string()) + } + } + "lowercase" => { + if let Some(text) = data.as_str() { + Ok(json!({ + "result": text.to_lowercase() + })) + } else { + Err("Data must be string for lowercase".to_string()) + } + } + _ => Err(format!("Unknown transform operation: {}", operation)), + } + } else { + Err("Missing operation in transform task".to_string()) + } + } else { + Err("Invalid transform task payload".to_string()) + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/types.rs b/crates/goose/src/agents/parallel_execution_tool/types.rs new file mode 100644 index 000000000000..561f8e45150c --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/types.rs @@ -0,0 +1,63 @@ +use serde::{Serialize, Deserialize}; +use serde_json::Value; + +// Task definition that LLMs will send +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Task { + pub id: String, + pub task_type: String, + pub payload: Value, +} + +// Result for each task +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TaskResult { + pub task_id: String, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +// Configuration for the parallel executor +#[derive(Debug, Clone, Deserialize)] +pub struct Config { + #[serde(default = "default_max_workers")] + pub max_workers: usize, + #[serde(default = "default_timeout")] + pub timeout_seconds: u64, + #[serde(default = "default_initial_workers")] + pub initial_workers: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_workers: default_max_workers(), + timeout_seconds: default_timeout(), + initial_workers: default_initial_workers(), + } + } +} + +fn default_max_workers() -> usize { 10 } +fn default_timeout() -> u64 { 30 } +fn default_initial_workers() -> usize { 2 } + +// Stats for the execution +#[derive(Debug, Serialize)] +pub struct ExecutionStats { + pub total_tasks: usize, + pub completed: usize, + pub failed: usize, + pub execution_time_ms: u128, +} + +// Main response structure +#[derive(Debug, Serialize)] +pub struct ExecutionResponse { + pub status: String, + pub results: Vec, + pub stats: ExecutionStats, +} \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/workers.rs b/crates/goose/src/agents/parallel_execution_tool/workers.rs new file mode 100644 index 000000000000..198a878b211c --- /dev/null +++ b/crates/goose/src/agents/parallel_execution_tool/workers.rs @@ -0,0 +1,96 @@ +use tokio::sync::mpsc; +use tokio::time::{sleep, Duration}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; +use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use crate::agents::parallel_execution_tool::tasks::process_task; + +pub struct SharedState { + pub task_sender: mpsc::Sender, + pub task_receiver: Arc>>, + pub result_sender: mpsc::Sender, + pub active_workers: Arc, + pub should_stop: Arc, + pub total_tasks: Arc, + pub completed_tasks: Arc, +} + +// Spawn a worker task +pub fn spawn_worker(state: Arc, worker_id: usize, timeout_seconds: u64) { + state.active_workers.fetch_add(1, Ordering::SeqCst); + + tokio::spawn(async move { + worker_loop(state, worker_id, timeout_seconds).await; + }); +} + +async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds: u64) { + loop { + // Try to receive a task + let task = { + let mut receiver = state.task_receiver.lock().await; + receiver.recv().await + }; + + match task { + Some(task) => { + // Process the task + let result = process_task(&task, timeout_seconds).await; + + // Send result + let _ = state.result_sender.send(result).await; + + // Update completed count + state.completed_tasks.fetch_add(1, Ordering::SeqCst); + } + None => { + // Channel closed, exit worker + break; + } + } + + // Check if we should stop + if state.should_stop.load(Ordering::SeqCst) { + break; + } + } + + // Worker is exiting + state.active_workers.fetch_sub(1, Ordering::SeqCst); +} + +// Scaling controller that monitors queue and spawns workers +pub async fn run_scaler( + state: Arc, + task_count: usize, + max_workers: usize, + timeout_seconds: u64, +) { + let mut worker_count = 0; + + loop { + sleep(Duration::from_millis(100)).await; + + let active = state.active_workers.load(Ordering::SeqCst); + let completed = state.completed_tasks.load(Ordering::SeqCst); + let pending = task_count.saturating_sub(completed); + + // Simple scaling logic: spawn worker if many pending tasks and under limit + if pending > active * 2 && active < max_workers && worker_count < max_workers { + spawn_worker(state.clone(), worker_count, timeout_seconds); + worker_count += 1; + } + + // If all tasks completed, signal stop + if completed >= task_count { + state.should_stop.store(true, Ordering::SeqCst); + break; + } + + // If no active workers and tasks remaining, spawn one + if active == 0 && pending > 0 { + spawn_worker(state.clone(), worker_count, timeout_seconds); + worker_count += 1; + } + } +} \ No newline at end of file From 8f9392b5fd320f7cdeddc8154d51bdaeed20c117 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Tue, 1 Jul 2025 14:25:39 +1000 Subject: [PATCH 02/14] fixed task schema --- .../parallel_run_task_tool.rs | 16 +++- .../agents/parallel_execution_tool/tasks.rs | 91 ++++--------------- .../agents/parallel_execution_tool/types.rs | 1 - 3 files changed, 33 insertions(+), 75 deletions(-) diff --git a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs index f819f980d103..77f0cfa61b9d 100644 --- a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs +++ b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs @@ -14,7 +14,18 @@ pub fn create_parallel_run_task_tool() -> Tool { "tasks": { "type": "array", "items": { - "type": "string", + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Unique identifier for the task" + }, + "payload": { + "type": "string", + "description": "the task description to be executed" + } + }, + "required": ["id", "payload"] }, "description": "The tasks to run in parallel" }, @@ -32,7 +43,8 @@ pub fn create_parallel_run_task_tool() -> Tool { } } } - } + }, + "required": ["tasks"] }), Some(ToolAnnotations { title: Some("Run tasks in parallel".to_string()), diff --git a/crates/goose/src/agents/parallel_execution_tool/tasks.rs b/crates/goose/src/agents/parallel_execution_tool/tasks.rs index 5000730c4d35..e0cc3f169690 100644 --- a/crates/goose/src/agents/parallel_execution_tool/tasks.rs +++ b/crates/goose/src/agents/parallel_execution_tool/tasks.rs @@ -1,4 +1,4 @@ -use tokio::time::{timeout, Duration}; +use tokio::{process::Command, time::{timeout, Duration}}; use serde_json::{json, Value}; use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; @@ -32,79 +32,26 @@ pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { async fn execute_task(task: Task) -> Result { println!("Executing task: {:?}", task); - Ok(json!({ - "data": "my data", - })) -} - -// Compute task processor -async fn process_compute_task(task: &Task) -> Result { - // Simulate CPU-bound work - tokio::task::yield_now().await; - if let Some(numbers) = task.payload.get("numbers").and_then(|v| v.as_array()) { - let sum: f64 = numbers - .iter() - .filter_map(|v| v.as_f64()) - .sum(); - - Ok(json!({ - "result": sum, - "operation": "sum" - })) - } else { - Err("Invalid compute task payload".to_string()) - } -} + let output = Command::new("goose") + .arg("run") + .arg("--text") + .arg(task.payload.to_string()) + .output() + .await + .map_err(|e| format!("Failed to run goose: {}", e))?; -// Fetch task processor (simulated) -async fn process_fetch_task(task: &Task) -> Result { - if let Some(url) = task.payload.get("url").and_then(|v| v.as_str()) { - // Simulate network delay - tokio::time::sleep(Duration::from_millis(100)).await; - - // In real implementation, you would use reqwest here: - // let response = reqwest::get(url).await?; - - Ok(json!({ - "url": url, - "status": 200, - "data": "Simulated response data" - })) - } else { - Err("Invalid fetch task payload".to_string()) + // Check for success + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(format!("Goose command failed: {}", stderr)); } -} -// Transform task processor -async fn process_transform_task(task: &Task) -> Result { - if let Some(data) = task.payload.get("data") { - if let Some(operation) = task.payload.get("operation").and_then(|v| v.as_str()) { - match operation { - "uppercase" => { - if let Some(text) = data.as_str() { - Ok(json!({ - "result": text.to_uppercase() - })) - } else { - Err("Data must be string for uppercase".to_string()) - } - } - "lowercase" => { - if let Some(text) = data.as_str() { - Ok(json!({ - "result": text.to_lowercase() - })) - } else { - Err("Data must be string for lowercase".to_string()) - } - } - _ => Err(format!("Unknown transform operation: {}", operation)), - } - } else { - Err("Missing operation in transform task".to_string()) - } - } else { - Err("Invalid transform task payload".to_string()) - } + // Parse stdout as string + let stdout = String::from_utf8_lossy(&output.stdout); + + // Wrap output in JSON + Ok(json!({ + "output": stdout.trim(), + })) } \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/types.rs b/crates/goose/src/agents/parallel_execution_tool/types.rs index 561f8e45150c..e1087566e720 100644 --- a/crates/goose/src/agents/parallel_execution_tool/types.rs +++ b/crates/goose/src/agents/parallel_execution_tool/types.rs @@ -5,7 +5,6 @@ use serde_json::Value; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Task { pub id: String, - pub task_type: String, pub payload: Value, } From e63485ba9094216934c60f3f89b61b74f85d329a Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Tue, 1 Jul 2025 19:06:14 +1000 Subject: [PATCH 03/14] run subrecipe --- crates/goose/src/agents/agent.rs | 7 ++- .../parallel_execution_tool/executor.rs | 42 +++++++------ .../src/agents/parallel_execution_tool/lib.rs | 23 ++++--- .../src/agents/parallel_execution_tool/mod.rs | 6 +- .../parallel_run_task_tool.rs | 39 ++++++++++-- .../agents/parallel_execution_tool/tasks.rs | 61 ++++++++++++++----- .../agents/parallel_execution_tool/types.rs | 17 ++++-- .../agents/parallel_execution_tool/workers.rs | 32 +++++----- .../agents/recipe_tools/sub_recipe_tools.rs | 39 ++++++++++++ crates/goose/src/agents/sub_recipe_manager.rs | 41 +++++++++++-- 10 files changed, 224 insertions(+), 83 deletions(-) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index dcaff2d55ac3..23270976c016 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -10,7 +10,9 @@ use futures_util::stream; use futures_util::stream::StreamExt; use mcp_core::protocol::JsonRpcMessage; -use crate::agents::parallel_execution_tool::parallel_run_task_tool::{self, PARALLEL_RUN_TASK_TOOL_NAME_PREFIX}; +use crate::agents::parallel_execution_tool::parallel_run_task_tool::{ + self, PARALLEL_RUN_TASK_TOOL_NAME_PREFIX, +}; use crate::agents::sub_recipe_manager::SubRecipeManager; use crate::config::{Config, ExtensionConfigManager, PermissionManager}; use crate::message::Message; @@ -261,8 +263,9 @@ impl Agent { let extension_manager = self.extension_manager.read().await; let sub_recipe_manager = self.sub_recipe_manager.lock().await; - + println!("==========Dispatching tool call: {}", tool_call.name); let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) { + println!("==========Dispatching sub recipe tool call: {}", tool_call.name); sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await diff --git a/crates/goose/src/agents/parallel_execution_tool/executor.rs b/crates/goose/src/agents/parallel_execution_tool/executor.rs index f2f67ba0d1dd..7b39a974dd86 100644 --- a/crates/goose/src/agents/parallel_execution_tool/executor.rs +++ b/crates/goose/src/agents/parallel_execution_tool/executor.rs @@ -1,20 +1,22 @@ +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, AtomicBool}; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::agents::parallel_execution_tool::lib::{Config, ExecutionResponse, ExecutionStats, Task, TaskResult}; +use crate::agents::parallel_execution_tool::lib::{ + Config, ExecutionResponse, ExecutionStats, Task, TaskResult, +}; use crate::agents::parallel_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; // Main parallel execution function pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResponse { let start_time = Instant::now(); let task_count = tasks.len(); - + // Create channels let (task_tx, task_rx) = mpsc::channel::(task_count); let (result_tx, mut result_rx) = mpsc::channel::(task_count); - + // Initialize shared state let shared_state = Arc::new(SharedState { task_sender: task_tx.clone(), @@ -25,25 +27,31 @@ pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResp total_tasks: Arc::new(AtomicUsize::new(task_count)), completed_tasks: Arc::new(AtomicUsize::new(0)), }); - + // Send all tasks to the queue for task in tasks.clone() { let _ = task_tx.send(task).await; } // Close sender so workers know when queue is empty drop(task_tx); - + // Start initial workers for i in 0..config.initial_workers { spawn_worker(shared_state.clone(), i, config.timeout_seconds); } - + // Start the scaler let scaler_state = shared_state.clone(); let scaler_handle = tokio::spawn(async move { - run_scaler(scaler_state, task_count, config.max_workers, config.timeout_seconds).await; + run_scaler( + scaler_state, + task_count, + config.max_workers, + config.timeout_seconds, + ) + .await; }); - + // Collect results let mut results = Vec::new(); while let Some(result) = result_rx.recv().await { @@ -52,19 +60,15 @@ pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResp break; } } - + // Wait for scaler to finish let _ = scaler_handle.await; - + // Calculate stats let execution_time = start_time.elapsed().as_millis(); - let completed = results.iter() - .filter(|r| r.status == "success") - .count(); - let failed = results.iter() - .filter(|r| r.status == "failed") - .count(); - + let completed = results.iter().filter(|r| r.status == "success").count(); + let failed = results.iter().filter(|r| r.status == "failed").count(); + ExecutionResponse { status: "completed".to_string(), results, @@ -75,4 +79,4 @@ pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResp execution_time_ms: execution_time, }, } -} \ No newline at end of file +} diff --git a/crates/goose/src/agents/parallel_execution_tool/lib.rs b/crates/goose/src/agents/parallel_execution_tool/lib.rs index 1d3bcae793cb..8f90213fd398 100644 --- a/crates/goose/src/agents/parallel_execution_tool/lib.rs +++ b/crates/goose/src/agents/parallel_execution_tool/lib.rs @@ -1,26 +1,25 @@ -pub use crate::agents::parallel_execution_tool::types::{Task, TaskResult, Config, ExecutionResponse, ExecutionStats}; pub use crate::agents::parallel_execution_tool::executor::parallel_execute; +pub use crate::agents::parallel_execution_tool::types::{ + Config, ExecutionResponse, ExecutionStats, Task, TaskResult, +}; use serde_json::Value; pub async fn llm_parallel_execute(input: Value) -> Result { - let tasks: Vec = serde_json::from_value( - input.get("tasks") - .ok_or("Missing tasks field")? - .clone() - ).map_err(|e| format!("Failed to parse tasks: {}", e))?; - + let tasks: Vec = + serde_json::from_value(input.get("tasks").ok_or("Missing tasks field")?.clone()) + .map_err(|e| format!("Failed to parse tasks: {}", e))?; + let config: Config = if let Some(config_value) = input.get("config") { serde_json::from_value(config_value.clone()) .map_err(|e| format!("Failed to parse config: {}", e))? } else { Config::default() }; - + // Execute tasks let response = parallel_execute(tasks, config).await; - + // Convert response to JSON - serde_json::to_value(response) - .map_err(|e| format!("Failed to serialize response: {}", e)) -} \ No newline at end of file + serde_json::to_value(response).map_err(|e| format!("Failed to serialize response: {}", e)) +} diff --git a/crates/goose/src/agents/parallel_execution_tool/mod.rs b/crates/goose/src/agents/parallel_execution_tool/mod.rs index 2931f580e63f..f7984f662223 100644 --- a/crates/goose/src/agents/parallel_execution_tool/mod.rs +++ b/crates/goose/src/agents/parallel_execution_tool/mod.rs @@ -1,6 +1,6 @@ -mod lib; -mod types; mod executor; +pub mod lib; +pub mod parallel_run_task_tool; mod tasks; +mod types; mod workers; -pub mod parallel_run_task_tool; \ No newline at end of file diff --git a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs index 77f0cfa61b9d..d901ae7e3752 100644 --- a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs +++ b/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs @@ -1,7 +1,9 @@ use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; use serde_json::Value; -use crate::agents::{parallel_execution_tool::lib::llm_parallel_execute, tool_execution::ToolCallResult}; +use crate::agents::{ + parallel_execution_tool::lib::llm_parallel_execute, tool_execution::ToolCallResult, +}; pub const PARALLEL_RUN_TASK_TOOL_NAME_PREFIX: &str = "parallel__run_task"; pub fn create_parallel_run_task_tool() -> Tool { @@ -20,9 +22,36 @@ pub fn create_parallel_run_task_tool() -> Tool { "type": "string", "description": "Unique identifier for the task" }, - "payload": { + "task_type": { "type": "string", - "description": "the task description to be executed" + "description": "the type of task to execute, can be one of: sub_recipe, text_instruction" + }, + "payload": { + "type": "object", + "properties": { + "sub_recipe": { + "type": "object", + "description": "sub recipe to execute", + "properties": { + "name": { + "type": "string", + "description": "name of the sub recipe to execute" + }, + "recipe_path": { + "type": "string", + "description": "path of the sub recipe file" + }, + "command_parameters": { + "type": "object", + "description": "parameters to pass to run recipe command with sub recipe file" + } + } + }, + "text_instruction": { + "type": "string", + "description": "text instruction to execute" + } + } } }, "required": ["id", "payload"] @@ -61,7 +90,7 @@ pub async fn run_tasks(execute_data: Value) -> ToolCallResult { Ok(result) => { let output = serde_json::to_string(&result).unwrap(); ToolCallResult::from(Ok(vec![Content::text(output)])) - }, + } Err(e) => ToolCallResult::from(Err(ToolError::ExecutionError(e.to_string()))), } -} \ No newline at end of file +} diff --git a/crates/goose/src/agents/parallel_execution_tool/tasks.rs b/crates/goose/src/agents/parallel_execution_tool/tasks.rs index e0cc3f169690..c0672c61cec7 100644 --- a/crates/goose/src/agents/parallel_execution_tool/tasks.rs +++ b/crates/goose/src/agents/parallel_execution_tool/tasks.rs @@ -1,12 +1,15 @@ -use tokio::{process::Command, time::{timeout, Duration}}; -use serde_json::{json, Value}; use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use serde_json::{json, Value}; +use tokio::{ + process::Command, + time::{timeout, Duration}, +}; // Process a single task based on its type pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { let task_clone = task.clone(); let timeout_duration = Duration::from_secs(timeout_seconds); - + // Execute with timeout match timeout(timeout_duration, execute_task(task_clone)).await { Ok(Ok(data)) => TaskResult { @@ -31,24 +34,50 @@ pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { } async fn execute_task(task: Task) -> Result { - println!("Executing task: {:?}", task); - - let output = Command::new("goose") - .arg("run") - .arg("--text") - .arg(task.payload.to_string()) - .output() - .await - .map_err(|e| format!("Failed to run goose: {}", e))?; - + println!("=======Executing task: {:?}", task); + let run_command_output = if task.task_type == "sub_recipe" { + let sub_recipe = task.payload.get("sub_recipe").unwrap(); + let name = sub_recipe.get("name").unwrap().as_str().unwrap(); + let path = sub_recipe.get("recipe_path").unwrap().as_str().unwrap(); + let command_parameters = sub_recipe.get("command_parameters").unwrap(); + let mut command = Command::new("goose"); + command.arg("run").arg("--recipe").arg(path); + if let Some(params_map) = command_parameters.as_object() { + for (key, value) in params_map { + let key_str = key.to_string(); + let value_str = value.as_str().unwrap_or(&value.to_string()).to_string(); + command + .arg("--params") + .arg(format!("{}={}", key_str, value_str)); + } + } + command + .output() + .await + .map_err(|e| format!("Failed to run goose: {}", e))? + } else { + Command::new("goose") + .arg("run") + .arg("--text") + .arg( + task.payload + .get("text_instruction") + .unwrap() + .as_str() + .unwrap(), + ) + .output() + .await + .map_err(|e| format!("Failed to run goose: {}", e))? + }; // Check for success - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); + if !run_command_output.status.success() { + let stderr = String::from_utf8_lossy(&run_command_output.stderr); return Err(format!("Goose command failed: {}", stderr)); } // Parse stdout as string - let stdout = String::from_utf8_lossy(&output.stdout); + let stdout = String::from_utf8_lossy(&run_command_output.stdout); // Wrap output in JSON Ok(json!({ diff --git a/crates/goose/src/agents/parallel_execution_tool/types.rs b/crates/goose/src/agents/parallel_execution_tool/types.rs index e1087566e720..ede71dbf40b4 100644 --- a/crates/goose/src/agents/parallel_execution_tool/types.rs +++ b/crates/goose/src/agents/parallel_execution_tool/types.rs @@ -1,10 +1,11 @@ -use serde::{Serialize, Deserialize}; +use serde::{Deserialize, Serialize}; use serde_json::Value; // Task definition that LLMs will send #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Task { pub id: String, + pub task_type: String, pub payload: Value, } @@ -40,9 +41,15 @@ impl Default for Config { } } -fn default_max_workers() -> usize { 10 } -fn default_timeout() -> u64 { 30 } -fn default_initial_workers() -> usize { 2 } +fn default_max_workers() -> usize { + 10 +} +fn default_timeout() -> u64 { + 300 +} +fn default_initial_workers() -> usize { + 2 +} // Stats for the execution #[derive(Debug, Serialize)] @@ -59,4 +66,4 @@ pub struct ExecutionResponse { pub status: String, pub results: Vec, pub stats: ExecutionStats, -} \ No newline at end of file +} diff --git a/crates/goose/src/agents/parallel_execution_tool/workers.rs b/crates/goose/src/agents/parallel_execution_tool/workers.rs index 198a878b211c..404172046c74 100644 --- a/crates/goose/src/agents/parallel_execution_tool/workers.rs +++ b/crates/goose/src/agents/parallel_execution_tool/workers.rs @@ -1,9 +1,9 @@ +use crate::agents::parallel_execution_tool::tasks::process_task; +use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering}; -use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; -use crate::agents::parallel_execution_tool::tasks::process_task; pub struct SharedState { pub task_sender: mpsc::Sender, @@ -18,7 +18,7 @@ pub struct SharedState { // Spawn a worker task pub fn spawn_worker(state: Arc, worker_id: usize, timeout_seconds: u64) { state.active_workers.fetch_add(1, Ordering::SeqCst); - + tokio::spawn(async move { worker_loop(state, worker_id, timeout_seconds).await; }); @@ -31,15 +31,15 @@ async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds let mut receiver = state.task_receiver.lock().await; receiver.recv().await }; - + match task { Some(task) => { // Process the task let result = process_task(&task, timeout_seconds).await; - + // Send result let _ = state.result_sender.send(result).await; - + // Update completed count state.completed_tasks.fetch_add(1, Ordering::SeqCst); } @@ -48,13 +48,13 @@ async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds break; } } - + // Check if we should stop if state.should_stop.load(Ordering::SeqCst) { break; } } - + // Worker is exiting state.active_workers.fetch_sub(1, Ordering::SeqCst); } @@ -67,30 +67,30 @@ pub async fn run_scaler( timeout_seconds: u64, ) { let mut worker_count = 0; - + loop { sleep(Duration::from_millis(100)).await; - + let active = state.active_workers.load(Ordering::SeqCst); let completed = state.completed_tasks.load(Ordering::SeqCst); let pending = task_count.saturating_sub(completed); - + // Simple scaling logic: spawn worker if many pending tasks and under limit if pending > active * 2 && active < max_workers && worker_count < max_workers { spawn_worker(state.clone(), worker_count, timeout_seconds); worker_count += 1; } - + // If all tasks completed, signal stop if completed >= task_count { state.should_stop.store(true, Ordering::SeqCst); break; } - + // If no active workers and tasks remaining, spawn one if active == 0 && pending > 0 { spawn_worker(state.clone(), worker_count, timeout_seconds); worker_count += 1; } } -} \ No newline at end of file +} diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index 2fd4f50434a9..4feb7fa81e34 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -6,10 +6,13 @@ use serde_json::{json, Map, Value}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; +use crate::agents::parallel_execution_tool::lib::Task; use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe}; pub const SUB_RECIPE_TOOL_NAME_PREFIX: &str = "subrecipe__run_"; +pub const SUB_RECIPE_TASK_TOOL_NAME_PREFIX: &str = "subrecipe__create_task"; + pub fn create_sub_recipe_tool(sub_recipe: &SubRecipe) -> Tool { let input_schema = get_input_schema(sub_recipe).unwrap(); Tool::new( @@ -30,6 +33,22 @@ pub fn create_sub_recipe_tool(sub_recipe: &SubRecipe) -> Tool { ) } +pub fn create_sub_recipe_task_tool(sub_recipe: &SubRecipe) -> Tool { + let input_schema = get_input_schema(sub_recipe).unwrap(); + Tool::new( + format!("{}_{}", SUB_RECIPE_TASK_TOOL_NAME_PREFIX, sub_recipe.name), + "Before running this sub recipe, you should first create a task with this tool and then pass the task to the task executor".to_string(), + input_schema, + Some(ToolAnnotations { + title: Some(format!("create sub recipe task {}", sub_recipe.name)), + read_only_hint: false, + destructive_hint: true, + idempotent_hint: false, + open_world_hint: true, + }), + ) +} + fn get_sub_recipe_parameter_definition( sub_recipe: &SubRecipe, ) -> Result>> { @@ -99,6 +118,26 @@ fn prepare_command_params( Ok(sub_recipe_params) } +pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Result { + println!("==========Creating task for sub recipe: {}", sub_recipe.name); + let command_params = prepare_command_params(sub_recipe, params)?; + let payload = json!({ + "sub_recipe": { + "name": sub_recipe.name.clone(), + "command_parameters": command_params, + "recipe_path": sub_recipe.path.clone(), + } + }); + let task = Task { + id: uuid::Uuid::new_v4().to_string(), + task_type: "sub_recipe".to_string(), + payload: payload, + }; + let task_json = serde_json::to_string(&task).map_err(|e| anyhow::anyhow!("Failed to serialize Task: {}", e))?; + println!("==========Created task: {}", task_json); + Ok(task_json) +} + pub async fn run_sub_recipe(sub_recipe: &SubRecipe, params: Value) -> Result { let command_params = prepare_command_params(sub_recipe, params)?; diff --git a/crates/goose/src/agents/sub_recipe_manager.rs b/crates/goose/src/agents/sub_recipe_manager.rs index 3637c947c97c..32b4ed4f9ff2 100644 --- a/crates/goose/src/agents/sub_recipe_manager.rs +++ b/crates/goose/src/agents/sub_recipe_manager.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use crate::{ agents::{ recipe_tools::sub_recipe_tools::{ - create_sub_recipe_tool, run_sub_recipe, SUB_RECIPE_TOOL_NAME_PREFIX, + create_sub_recipe_task, create_sub_recipe_task_tool, create_sub_recipe_tool, run_sub_recipe, SUB_RECIPE_TASK_TOOL_NAME_PREFIX, SUB_RECIPE_TOOL_NAME_PREFIX }, tool_execution::ToolCallResult, }, @@ -34,12 +34,18 @@ impl SubRecipeManager { pub fn add_sub_recipe_tools(&mut self, sub_recipes_to_add: Vec) { for sub_recipe in sub_recipes_to_add { + // let sub_recipe_key = format!( + // "{}_{}", + // SUB_RECIPE_TOOL_NAME_PREFIX, + // sub_recipe.name.clone() + // ); + // let tool = create_sub_recipe_tool(&sub_recipe); let sub_recipe_key = format!( "{}_{}", - SUB_RECIPE_TOOL_NAME_PREFIX, + SUB_RECIPE_TASK_TOOL_NAME_PREFIX, sub_recipe.name.clone() ); - let tool = create_sub_recipe_tool(&sub_recipe); + let tool = create_sub_recipe_task_tool(&sub_recipe); self.sub_recipe_tools.insert(sub_recipe_key.clone(), tool); self.sub_recipes.insert(sub_recipe_key.clone(), sub_recipe); } @@ -61,6 +67,31 @@ impl SubRecipeManager { } } + // async fn call_sub_recipe_tool( + // &self, + // tool_name: &str, + // params: Value, + // ) -> Result, ToolError> { + // let sub_recipe = self.sub_recipes.get(tool_name).ok_or_else(|| { + // let sub_recipe_name = tool_name + // .strip_prefix(SUB_RECIPE_TOOL_NAME_PREFIX) + // .and_then(|s| s.strip_prefix("_")) + // .ok_or_else(|| { + // ToolError::InvalidParameters(format!( + // "Invalid sub-recipe tool name format: {}", + // tool_name + // )) + // }) + // .unwrap(); + + // ToolError::InvalidParameters(format!("Sub-recipe '{}' not found", sub_recipe_name)) + // })?; + + // let output = run_sub_recipe(sub_recipe, params).await.map_err(|e| { + // ToolError::ExecutionError(format!("Sub-recipe execution failed: {}", e)) + // })?; + // Ok(vec![Content::text(output)]) + // } async fn call_sub_recipe_tool( &self, tool_name: &str, @@ -68,7 +99,7 @@ impl SubRecipeManager { ) -> Result, ToolError> { let sub_recipe = self.sub_recipes.get(tool_name).ok_or_else(|| { let sub_recipe_name = tool_name - .strip_prefix(SUB_RECIPE_TOOL_NAME_PREFIX) + .strip_prefix(SUB_RECIPE_TASK_TOOL_NAME_PREFIX) .and_then(|s| s.strip_prefix("_")) .ok_or_else(|| { ToolError::InvalidParameters(format!( @@ -81,7 +112,7 @@ impl SubRecipeManager { ToolError::InvalidParameters(format!("Sub-recipe '{}' not found", sub_recipe_name)) })?; - let output = run_sub_recipe(sub_recipe, params).await.map_err(|e| { + let output = create_sub_recipe_task(sub_recipe, params).await.map_err(|e| { ToolError::ExecutionError(format!("Sub-recipe execution failed: {}", e)) })?; Ok(vec![Content::text(output)]) From bab0b987ecfba74574c1ca99956aeb9287a7ff9d Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Tue, 1 Jul 2025 19:53:20 +1000 Subject: [PATCH 04/14] print output lively --- .../agents/parallel_execution_tool/tasks.rs | 78 ++++++++++--------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/crates/goose/src/agents/parallel_execution_tool/tasks.rs b/crates/goose/src/agents/parallel_execution_tool/tasks.rs index c0672c61cec7..4c9710bf9562 100644 --- a/crates/goose/src/agents/parallel_execution_tool/tasks.rs +++ b/crates/goose/src/agents/parallel_execution_tool/tasks.rs @@ -1,9 +1,11 @@ -use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use tokio::process::Command; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::time::timeout; +use std::process::Stdio; +use std::time::Duration; use serde_json::{json, Value}; -use tokio::{ - process::Command, - time::{timeout, Duration}, -}; + +use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; // Process a single task based on its type pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { @@ -35,52 +37,52 @@ pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { async fn execute_task(task: Task) -> Result { println!("=======Executing task: {:?}", task); - let run_command_output = if task.task_type == "sub_recipe" { + + let mut command = if task.task_type == "sub_recipe" { let sub_recipe = task.payload.get("sub_recipe").unwrap(); let name = sub_recipe.get("name").unwrap().as_str().unwrap(); let path = sub_recipe.get("recipe_path").unwrap().as_str().unwrap(); let command_parameters = sub_recipe.get("command_parameters").unwrap(); - let mut command = Command::new("goose"); - command.arg("run").arg("--recipe").arg(path); + let mut cmd = Command::new("goose"); + cmd.arg("run").arg("--recipe").arg(path); if let Some(params_map) = command_parameters.as_object() { for (key, value) in params_map { let key_str = key.to_string(); let value_str = value.as_str().unwrap_or(&value.to_string()).to_string(); - command - .arg("--params") - .arg(format!("{}={}", key_str, value_str)); + cmd.arg("--params").arg(format!("{}={}", key_str, value_str)); } } - command - .output() - .await - .map_err(|e| format!("Failed to run goose: {}", e))? + cmd } else { - Command::new("goose") - .arg("run") - .arg("--text") - .arg( - task.payload - .get("text_instruction") - .unwrap() - .as_str() - .unwrap(), - ) - .output() - .await - .map_err(|e| format!("Failed to run goose: {}", e))? + let text = task.payload.get("text_instruction").unwrap().as_str().unwrap(); + let mut cmd = Command::new("goose"); + cmd.arg("run").arg("--text").arg(text); + cmd }; - // Check for success - if !run_command_output.status.success() { - let stderr = String::from_utf8_lossy(&run_command_output.stderr); - return Err(format!("Goose command failed: {}", stderr)); + + // Configure to capture stdout + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + + // Spawn the child process + let mut child = command.spawn().map_err(|e| format!("Failed to spawn goose: {}", e))?; + + // Pipe the stdout + if let Some(stdout) = child.stdout.take() { + let reader = BufReader::new(stdout); + let mut lines = reader.lines(); + + println!("--- Goose output ---"); + while let Ok(Some(line)) = lines.next_line().await { + println!("{}", line); + } } - // Parse stdout as string - let stdout = String::from_utf8_lossy(&run_command_output.stdout); + // Await final status + let status = child.wait().await.map_err(|e| format!("Failed to wait on goose: {}", e))?; + if !status.success() { + return Err(format!("Goose command failed with exit code: {:?}", status.code())); + } - // Wrap output in JSON - Ok(json!({ - "output": stdout.trim(), - })) + Ok(json!({ "output": "Goose command completed." })) } \ No newline at end of file From daed8a61acdf0924fb437c59c926e8577630391a Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Tue, 1 Jul 2025 21:24:49 +1000 Subject: [PATCH 05/14] run single task --- crates/goose/src/agents/agent.rs | 11 ++++---- crates/goose/src/agents/mod.rs | 2 +- .../agents/recipe_tools/sub_recipe_tools.rs | 2 +- .../executor.rs | 25 +++++++++++++++++-- .../lib.rs | 12 ++++++--- .../mod.rs | 2 +- .../sub_agent_execute_task_tool.rs} | 14 ++++++----- .../tasks.rs | 2 +- .../types.rs | 0 .../workers.rs | 4 +-- 10 files changed, 51 insertions(+), 23 deletions(-) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/executor.rs (73%) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/lib.rs (58%) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/mod.rs (62%) rename crates/goose/src/agents/{parallel_execution_tool/parallel_run_task_tool.rs => sub_agent_execution_tool/sub_agent_execute_task_tool.rs} (85%) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/tasks.rs (97%) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/types.rs (100%) rename crates/goose/src/agents/{parallel_execution_tool => sub_agent_execution_tool}/workers.rs (95%) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 23270976c016..c7fa43709558 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -10,8 +10,8 @@ use futures_util::stream; use futures_util::stream::StreamExt; use mcp_core::protocol::JsonRpcMessage; -use crate::agents::parallel_execution_tool::parallel_run_task_tool::{ - self, PARALLEL_RUN_TASK_TOOL_NAME_PREFIX, +use crate::agents::sub_agent_execution_tool::sub_agent_execute_task_tool::{ + self, SUB_AGENT_EXECUTE_TASK_TOOL_NAME, }; use crate::agents::sub_recipe_manager::SubRecipeManager; use crate::config::{Config, ExtensionConfigManager, PermissionManager}; @@ -263,14 +263,13 @@ impl Agent { let extension_manager = self.extension_manager.read().await; let sub_recipe_manager = self.sub_recipe_manager.lock().await; - println!("==========Dispatching tool call: {}", tool_call.name); let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) { println!("==========Dispatching sub recipe tool call: {}", tool_call.name); sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await - } else if tool_call.name == PARALLEL_RUN_TASK_TOOL_NAME_PREFIX { - parallel_run_task_tool::run_tasks(tool_call.arguments.clone()).await + } else if tool_call.name == SUB_AGENT_EXECUTE_TASK_TOOL_NAME { + sub_agent_execute_task_tool::run_tasks(tool_call.arguments.clone()).await } else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME { // Check if the tool is read_resource and handle it separately ToolCallResult::from( @@ -551,7 +550,7 @@ impl Agent { let sub_recipe_manager = self.sub_recipe_manager.lock().await; prefixed_tools.extend(sub_recipe_manager.sub_recipe_tools.values().cloned()); } - prefixed_tools.push(parallel_run_task_tool::create_parallel_run_task_tool()); + prefixed_tools.push(sub_agent_execute_task_tool::create_sub_agent_execute_task_tool()); prefixed_tools } diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index d793651e3170..bbc2ca9c3f80 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -3,7 +3,7 @@ mod context; pub mod extension; pub mod extension_manager; mod large_response_handler; -pub mod parallel_execution_tool; +pub mod sub_agent_execution_tool; pub mod platform_tools; pub mod prompt_manager; mod recipe_tools; diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index 4feb7fa81e34..ae468ac2724f 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -6,7 +6,7 @@ use serde_json::{json, Map, Value}; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; -use crate::agents::parallel_execution_tool::lib::Task; +use crate::agents::sub_agent_execution_tool::lib::Task; use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe}; pub const SUB_RECIPE_TOOL_NAME_PREFIX: &str = "subrecipe__run_"; diff --git a/crates/goose/src/agents/parallel_execution_tool/executor.rs b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs similarity index 73% rename from crates/goose/src/agents/parallel_execution_tool/executor.rs rename to crates/goose/src/agents/sub_agent_execution_tool/executor.rs index 7b39a974dd86..cefff4b7f732 100644 --- a/crates/goose/src/agents/parallel_execution_tool/executor.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs @@ -3,10 +3,31 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::agents::parallel_execution_tool::lib::{ +use crate::agents::sub_agent_execution_tool::lib::{ Config, ExecutionResponse, ExecutionStats, Task, TaskResult, }; -use crate::agents::parallel_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; +use crate::agents::sub_agent_execution_tool::tasks::process_task; +use crate::agents::sub_agent_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; + +pub async fn execute_single_task(task: &Task, config: Config) -> ExecutionResponse { + let start_time = Instant::now(); + let result = process_task(task, config.timeout_seconds).await; + + let execution_time = start_time.elapsed().as_millis(); + let completed = if result.status == "success" { 1 } else { 0 }; + let failed = if result.status == "failed" { 1 } else { 0 }; + + return ExecutionResponse { + status: "completed".to_string(), + results: vec![result], + stats: ExecutionStats { + total_tasks: 1, + completed, + failed, + execution_time_ms: execution_time, + }, + }; +} // Main parallel execution function pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResponse { diff --git a/crates/goose/src/agents/parallel_execution_tool/lib.rs b/crates/goose/src/agents/sub_agent_execution_tool/lib.rs similarity index 58% rename from crates/goose/src/agents/parallel_execution_tool/lib.rs rename to crates/goose/src/agents/sub_agent_execution_tool/lib.rs index 8f90213fd398..b3c42b6f35aa 100644 --- a/crates/goose/src/agents/parallel_execution_tool/lib.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/lib.rs @@ -1,11 +1,12 @@ -pub use crate::agents::parallel_execution_tool::executor::parallel_execute; -pub use crate::agents::parallel_execution_tool::types::{ +use crate::agents::sub_agent_execution_tool::executor::execute_single_task; +pub use crate::agents::sub_agent_execution_tool::executor::parallel_execute; +pub use crate::agents::sub_agent_execution_tool::types::{ Config, ExecutionResponse, ExecutionStats, Task, TaskResult, }; use serde_json::Value; -pub async fn llm_parallel_execute(input: Value) -> Result { +pub async fn execute_tasks(input: Value) -> Result { let tasks: Vec = serde_json::from_value(input.get("tasks").ok_or("Missing tasks field")?.clone()) .map_err(|e| format!("Failed to parse tasks: {}", e))?; @@ -16,6 +17,11 @@ pub async fn llm_parallel_execute(input: Value) -> Result { } else { Config::default() }; + let task_count = tasks.len(); + if task_count == 1 { + let response = execute_single_task(&tasks[0], config).await; + return serde_json::to_value(response).map_err(|e| format!("Failed to serialize response: {}", e)); + } // Execute tasks let response = parallel_execute(tasks, config).await; diff --git a/crates/goose/src/agents/parallel_execution_tool/mod.rs b/crates/goose/src/agents/sub_agent_execution_tool/mod.rs similarity index 62% rename from crates/goose/src/agents/parallel_execution_tool/mod.rs rename to crates/goose/src/agents/sub_agent_execution_tool/mod.rs index f7984f662223..360f36e301aa 100644 --- a/crates/goose/src/agents/parallel_execution_tool/mod.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/mod.rs @@ -1,6 +1,6 @@ mod executor; pub mod lib; -pub mod parallel_run_task_tool; +pub mod sub_agent_execute_task_tool; mod tasks; mod types; mod workers; diff --git a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs similarity index 85% rename from crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs rename to crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs index d901ae7e3752..f4de7503c72c 100644 --- a/crates/goose/src/agents/parallel_execution_tool/parallel_run_task_tool.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs @@ -2,14 +2,16 @@ use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; use serde_json::Value; use crate::agents::{ - parallel_execution_tool::lib::llm_parallel_execute, tool_execution::ToolCallResult, + sub_agent_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult, }; -pub const PARALLEL_RUN_TASK_TOOL_NAME_PREFIX: &str = "parallel__run_task"; -pub fn create_parallel_run_task_tool() -> Tool { +pub const SUB_AGENT_EXECUTE_TASK_TOOL_NAME: &str = "sub_agent__execute_task"; +pub fn create_sub_agent_execute_task_tool() -> Tool { Tool::new( - PARALLEL_RUN_TASK_TOOL_NAME_PREFIX, - "Run tasks in parallel", + SUB_AGENT_EXECUTE_TASK_TOOL_NAME, + "Only use this tool when you want to execute sub agent task or sub recipe task. + If the tasks are not specified to be executed in parallel, you should use this tool to run each task immediately by passing a single task to the tool for each run. + If you want to execute tasks in parallel, you should pass a list of tasks to the tool.", serde_json::json!({ "type": "object", "properties": { @@ -86,7 +88,7 @@ pub fn create_parallel_run_task_tool() -> Tool { } pub async fn run_tasks(execute_data: Value) -> ToolCallResult { - match llm_parallel_execute(execute_data).await { + match execute_tasks(execute_data).await { Ok(result) => { let output = serde_json::to_string(&result).unwrap(); ToolCallResult::from(Ok(vec![Content::text(output)])) diff --git a/crates/goose/src/agents/parallel_execution_tool/tasks.rs b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs similarity index 97% rename from crates/goose/src/agents/parallel_execution_tool/tasks.rs rename to crates/goose/src/agents/sub_agent_execution_tool/tasks.rs index 4c9710bf9562..73537a80e87d 100644 --- a/crates/goose/src/agents/parallel_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs @@ -5,7 +5,7 @@ use std::process::Stdio; use std::time::Duration; use serde_json::{json, Value}; -use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use crate::agents::sub_agent_execution_tool::types::{Task, TaskResult}; // Process a single task based on its type pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { diff --git a/crates/goose/src/agents/parallel_execution_tool/types.rs b/crates/goose/src/agents/sub_agent_execution_tool/types.rs similarity index 100% rename from crates/goose/src/agents/parallel_execution_tool/types.rs rename to crates/goose/src/agents/sub_agent_execution_tool/types.rs diff --git a/crates/goose/src/agents/parallel_execution_tool/workers.rs b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs similarity index 95% rename from crates/goose/src/agents/parallel_execution_tool/workers.rs rename to crates/goose/src/agents/sub_agent_execution_tool/workers.rs index 404172046c74..6efe5a7a6b89 100644 --- a/crates/goose/src/agents/parallel_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs @@ -1,5 +1,5 @@ -use crate::agents::parallel_execution_tool::tasks::process_task; -use crate::agents::parallel_execution_tool::types::{Task, TaskResult}; +use crate::agents::sub_agent_execution_tool::tasks::process_task; +use crate::agents::sub_agent_execution_tool::types::{Task, TaskResult}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; From 9d2b4b6ce12aa78dd2ac177b9003cc60e0d5cebe Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 11:42:38 +1000 Subject: [PATCH 06/14] fmt --- crates/goose/src/agents/agent.rs | 5 +- crates/goose/src/agents/mod.rs | 2 +- .../agents/recipe_tools/sub_recipe_tools.rs | 98 ++----------------- .../sub_agent_execution_tool/executor.rs | 8 +- .../agents/sub_agent_execution_tool/lib.rs | 3 +- .../sub_agent_execute_task_tool.rs | 4 +- .../agents/sub_agent_execution_tool/tasks.rs | 35 ++++--- .../sub_agent_execution_tool/workers.rs | 2 - crates/goose/src/agents/sub_recipe_manager.rs | 10 +- 9 files changed, 48 insertions(+), 119 deletions(-) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index c7fa43709558..561032a87652 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -264,7 +264,10 @@ impl Agent { let extension_manager = self.extension_manager.read().await; let sub_recipe_manager = self.sub_recipe_manager.lock().await; let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) { - println!("==========Dispatching sub recipe tool call: {}", tool_call.name); + println!( + "==========Dispatching sub recipe tool call: {}", + tool_call.name + ); sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index bbc2ca9c3f80..043cca27ce34 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -3,7 +3,6 @@ mod context; pub mod extension; pub mod extension_manager; mod large_response_handler; -pub mod sub_agent_execution_tool; pub mod platform_tools; pub mod prompt_manager; mod recipe_tools; @@ -11,6 +10,7 @@ mod reply_parts; mod router_tool_selector; mod router_tools; mod schedule_tool; +pub mod sub_agent_execution_tool; pub mod sub_recipe_manager; pub mod subagent; pub mod subagent_handler; diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index ae468ac2724f..546d6497f2b4 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -3,36 +3,12 @@ use std::{collections::HashMap, fs}; use anyhow::Result; use mcp_core::tool::{Tool, ToolAnnotations}; use serde_json::{json, Map, Value}; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Command; use crate::agents::sub_agent_execution_tool::lib::Task; use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe}; -pub const SUB_RECIPE_TOOL_NAME_PREFIX: &str = "subrecipe__run_"; - pub const SUB_RECIPE_TASK_TOOL_NAME_PREFIX: &str = "subrecipe__create_task"; -pub fn create_sub_recipe_tool(sub_recipe: &SubRecipe) -> Tool { - let input_schema = get_input_schema(sub_recipe).unwrap(); - Tool::new( - format!("{}_{}", SUB_RECIPE_TOOL_NAME_PREFIX, sub_recipe.name), - "Run a sub recipe. - Use this tool when you need to run a sub-recipe. - The sub recipe will be run with the provided parameters - and return the output of the sub recipe." - .to_string(), - input_schema, - Some(ToolAnnotations { - title: Some(format!("run sub recipe {}", sub_recipe.name)), - read_only_hint: false, - destructive_hint: true, - idempotent_hint: false, - open_world_hint: true, - }), - ) -} - pub fn create_sub_recipe_task_tool(sub_recipe: &SubRecipe) -> Tool { let input_schema = get_input_schema(sub_recipe).unwrap(); Tool::new( @@ -119,7 +95,10 @@ fn prepare_command_params( } pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Result { - println!("==========Creating task for sub recipe: {}", sub_recipe.name); + println!( + "==========Creating task for sub recipe: {}", + sub_recipe.name + ); let command_params = prepare_command_params(sub_recipe, params)?; let payload = json!({ "sub_recipe": { @@ -131,76 +110,13 @@ pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Re let task = Task { id: uuid::Uuid::new_v4().to_string(), task_type: "sub_recipe".to_string(), - payload: payload, + payload, }; - let task_json = serde_json::to_string(&task).map_err(|e| anyhow::anyhow!("Failed to serialize Task: {}", e))?; + let task_json = serde_json::to_string(&task) + .map_err(|e| anyhow::anyhow!("Failed to serialize Task: {}", e))?; println!("==========Created task: {}", task_json); Ok(task_json) } -pub async fn run_sub_recipe(sub_recipe: &SubRecipe, params: Value) -> Result { - let command_params = prepare_command_params(sub_recipe, params)?; - - let mut command = Command::new("goose"); - command.arg("run").arg("--recipe").arg(&sub_recipe.path); - - for (key, value) in command_params { - command.arg("--params").arg(format!("{}={}", key, value)); - } - - command.stdout(std::process::Stdio::piped()); - command.stderr(std::process::Stdio::piped()); - - let mut child = command - .spawn() - .map_err(|e| anyhow::anyhow!("Failed to spawn: {}", e))?; - - let stdout = child.stdout.take().expect("Failed to capture stdout"); - let stderr = child.stderr.take().expect("Failed to capture stderr"); - - let mut stdout_reader = BufReader::new(stdout).lines(); - let mut stderr_reader = BufReader::new(stderr).lines(); - let stdout_sub_recipe_name = sub_recipe.name.clone(); - let stderr_sub_recipe_name = sub_recipe.name.clone(); - - // Spawn background tasks to read from stdout and stderr - let stdout_task = tokio::spawn(async move { - let mut buffer = String::new(); - while let Ok(Some(line)) = stdout_reader.next_line().await { - println!("[sub-recipe {}] {}", stdout_sub_recipe_name, line); - buffer.push_str(&line); - buffer.push('\n'); - } - buffer - }); - - let stderr_task = tokio::spawn(async move { - let mut buffer = String::new(); - while let Ok(Some(line)) = stderr_reader.next_line().await { - eprintln!( - "[stderr for sub-recipe {}] {}", - stderr_sub_recipe_name, line - ); - buffer.push_str(&line); - buffer.push('\n'); - } - buffer - }); - - let status = child - .wait() - .await - .map_err(|e| anyhow::anyhow!("Failed to wait for process: {}", e))?; - - let stdout_output = stdout_task.await.unwrap(); - let stderr_output = stderr_task.await.unwrap(); - - if status.success() { - Ok(stdout_output) - } else { - Err(anyhow::anyhow!("Command failed:\n{}", stderr_output)) - } -} - #[cfg(test)] mod tests; diff --git a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs index cefff4b7f732..2104b221ced7 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs @@ -12,12 +12,12 @@ use crate::agents::sub_agent_execution_tool::workers::{run_scaler, spawn_worker, pub async fn execute_single_task(task: &Task, config: Config) -> ExecutionResponse { let start_time = Instant::now(); let result = process_task(task, config.timeout_seconds).await; - + let execution_time = start_time.elapsed().as_millis(); let completed = if result.status == "success" { 1 } else { 0 }; let failed = if result.status == "failed" { 1 } else { 0 }; - return ExecutionResponse { + ExecutionResponse { status: "completed".to_string(), results: vec![result], stats: ExecutionStats { @@ -26,7 +26,7 @@ pub async fn execute_single_task(task: &Task, config: Config) -> ExecutionRespon failed, execution_time_ms: execution_time, }, - }; + } } // Main parallel execution function @@ -40,12 +40,10 @@ pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResp // Initialize shared state let shared_state = Arc::new(SharedState { - task_sender: task_tx.clone(), task_receiver: Arc::new(tokio::sync::Mutex::new(task_rx)), result_sender: result_tx, active_workers: Arc::new(AtomicUsize::new(0)), should_stop: Arc::new(AtomicBool::new(false)), - total_tasks: Arc::new(AtomicUsize::new(task_count)), completed_tasks: Arc::new(AtomicUsize::new(0)), }); diff --git a/crates/goose/src/agents/sub_agent_execution_tool/lib.rs b/crates/goose/src/agents/sub_agent_execution_tool/lib.rs index b3c42b6f35aa..4ecc6bb838f8 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/lib.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/lib.rs @@ -20,7 +20,8 @@ pub async fn execute_tasks(input: Value) -> Result { let task_count = tasks.len(); if task_count == 1 { let response = execute_single_task(&tasks[0], config).await; - return serde_json::to_value(response).map_err(|e| format!("Failed to serialize response: {}", e)); + return serde_json::to_value(response) + .map_err(|e| format!("Failed to serialize response: {}", e)); } // Execute tasks diff --git a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs index f4de7503c72c..4af25cf314c6 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs @@ -1,9 +1,7 @@ use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; use serde_json::Value; -use crate::agents::{ - sub_agent_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult, -}; +use crate::agents::{sub_agent_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult}; pub const SUB_AGENT_EXECUTE_TASK_TOOL_NAME: &str = "sub_agent__execute_task"; pub fn create_sub_agent_execute_task_tool() -> Tool { diff --git a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs index 73537a80e87d..b764ea457e08 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs @@ -1,9 +1,9 @@ -use tokio::process::Command; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::time::timeout; +use serde_json::{json, Value}; use std::process::Stdio; use std::time::Duration; -use serde_json::{json, Value}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio::time::timeout; use crate::agents::sub_agent_execution_tool::types::{Task, TaskResult}; @@ -40,7 +40,6 @@ async fn execute_task(task: Task) -> Result { let mut command = if task.task_type == "sub_recipe" { let sub_recipe = task.payload.get("sub_recipe").unwrap(); - let name = sub_recipe.get("name").unwrap().as_str().unwrap(); let path = sub_recipe.get("recipe_path").unwrap().as_str().unwrap(); let command_parameters = sub_recipe.get("command_parameters").unwrap(); let mut cmd = Command::new("goose"); @@ -49,12 +48,18 @@ async fn execute_task(task: Task) -> Result { for (key, value) in params_map { let key_str = key.to_string(); let value_str = value.as_str().unwrap_or(&value.to_string()).to_string(); - cmd.arg("--params").arg(format!("{}={}", key_str, value_str)); + cmd.arg("--params") + .arg(format!("{}={}", key_str, value_str)); } } cmd } else { - let text = task.payload.get("text_instruction").unwrap().as_str().unwrap(); + let text = task + .payload + .get("text_instruction") + .unwrap() + .as_str() + .unwrap(); let mut cmd = Command::new("goose"); cmd.arg("run").arg("--text").arg(text); cmd @@ -65,7 +70,9 @@ async fn execute_task(task: Task) -> Result { command.stderr(Stdio::piped()); // Spawn the child process - let mut child = command.spawn().map_err(|e| format!("Failed to spawn goose: {}", e))?; + let mut child = command + .spawn() + .map_err(|e| format!("Failed to spawn goose: {}", e))?; // Pipe the stdout if let Some(stdout) = child.stdout.take() { @@ -79,10 +86,16 @@ async fn execute_task(task: Task) -> Result { } // Await final status - let status = child.wait().await.map_err(|e| format!("Failed to wait on goose: {}", e))?; + let status = child + .wait() + .await + .map_err(|e| format!("Failed to wait on goose: {}", e))?; if !status.success() { - return Err(format!("Goose command failed with exit code: {:?}", status.code())); + return Err(format!( + "Goose command failed with exit code: {:?}", + status.code() + )); } Ok(json!({ "output": "Goose command completed." })) -} \ No newline at end of file +} diff --git a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs index 6efe5a7a6b89..a07f78e7e6e3 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs @@ -6,12 +6,10 @@ use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; pub struct SharedState { - pub task_sender: mpsc::Sender, pub task_receiver: Arc>>, pub result_sender: mpsc::Sender, pub active_workers: Arc, pub should_stop: Arc, - pub total_tasks: Arc, pub completed_tasks: Arc, } diff --git a/crates/goose/src/agents/sub_recipe_manager.rs b/crates/goose/src/agents/sub_recipe_manager.rs index 32b4ed4f9ff2..2441684b4b0e 100644 --- a/crates/goose/src/agents/sub_recipe_manager.rs +++ b/crates/goose/src/agents/sub_recipe_manager.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use crate::{ agents::{ recipe_tools::sub_recipe_tools::{ - create_sub_recipe_task, create_sub_recipe_task_tool, create_sub_recipe_tool, run_sub_recipe, SUB_RECIPE_TASK_TOOL_NAME_PREFIX, SUB_RECIPE_TOOL_NAME_PREFIX + create_sub_recipe_task, create_sub_recipe_task_tool, SUB_RECIPE_TASK_TOOL_NAME_PREFIX, }, tool_execution::ToolCallResult, }, @@ -112,9 +112,11 @@ impl SubRecipeManager { ToolError::InvalidParameters(format!("Sub-recipe '{}' not found", sub_recipe_name)) })?; - let output = create_sub_recipe_task(sub_recipe, params).await.map_err(|e| { - ToolError::ExecutionError(format!("Sub-recipe execution failed: {}", e)) - })?; + let output = create_sub_recipe_task(sub_recipe, params) + .await + .map_err(|e| { + ToolError::ExecutionError(format!("Sub-recipe execution failed: {}", e)) + })?; Ok(vec![Content::text(output)]) } } From d5a45a97268e31d3b566528a46d985f6c8e5218c Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 13:39:41 +1000 Subject: [PATCH 07/14] merge main --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e56a461ad0eb..816d0f0481a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5296,9 +5296,9 @@ dependencies = [ [[package]] name = "macro_rules_attribute" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a82271f7bc033d84bbca59a3ce3e4159938cb08a9c3aebbe54d215131518a13" +checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520" dependencies = [ "macro_rules_attribute-proc_macro", "paste", @@ -5306,9 +5306,9 @@ dependencies = [ [[package]] name = "macro_rules_attribute-proc_macro" -version = "0.2.0" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dd856d451cc0da70e2ef2ce95a18e39a93b7558bedf10201ad28503f918568" +checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30" [[package]] name = "malloc_buf" From a7cab6eea8934bf6aebb0b917e434b67e24daf63 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 14:04:01 +1000 Subject: [PATCH 08/14] fixed format --- crates/goose/src/agents/agent.rs | 1 - .../sub_agent_execution_tool/executor.rs | 4 +- .../sub_agent_execution_tool/workers.rs | 47 +++++++++++++++++-- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index a59471cd95c2..cde1c8353dc3 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -586,7 +586,6 @@ impl Agent { } prefixed_tools.push(sub_agent_execute_task_tool::create_sub_agent_execute_task_tool()); } - prefixed_tools } diff --git a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs index 2104b221ced7..f6e568b72d86 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/executor.rs @@ -55,8 +55,10 @@ pub async fn parallel_execute(tasks: Vec, config: Config) -> ExecutionResp drop(task_tx); // Start initial workers + let mut worker_handles = Vec::new(); for i in 0..config.initial_workers { - spawn_worker(shared_state.clone(), i, config.timeout_seconds); + let handle = spawn_worker(shared_state.clone(), i, config.timeout_seconds); + worker_handles.push(handle); } // Start the scaler diff --git a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs index a07f78e7e6e3..86169073a139 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/workers.rs @@ -5,6 +5,41 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; +#[cfg(test)] +mod tests { + use super::*; + use crate::agents::sub_agent_execution_tool::types::Task; + + #[tokio::test] + async fn test_spawn_worker_returns_handle() { + // Create a simple shared state for testing + let (task_tx, task_rx) = mpsc::channel::(1); + let (result_tx, _result_rx) = mpsc::channel::(1); + + let shared_state = Arc::new(SharedState { + task_receiver: Arc::new(tokio::sync::Mutex::new(task_rx)), + result_sender: result_tx, + active_workers: Arc::new(AtomicUsize::new(0)), + should_stop: Arc::new(AtomicBool::new(false)), + completed_tasks: Arc::new(AtomicUsize::new(0)), + }); + + // Test that spawn_worker returns a JoinHandle + let handle = spawn_worker(shared_state.clone(), 0, 5); + + // Verify it's a JoinHandle by checking we can abort it + assert!(!handle.is_finished()); + + // Signal stop and close the channel to let the worker exit + shared_state.should_stop.store(true, Ordering::SeqCst); + drop(task_tx); // Close the channel + + // Wait for the worker to finish + let result = handle.await; + assert!(result.is_ok()); + } +} + pub struct SharedState { pub task_receiver: Arc>>, pub result_sender: mpsc::Sender, @@ -14,12 +49,16 @@ pub struct SharedState { } // Spawn a worker task -pub fn spawn_worker(state: Arc, worker_id: usize, timeout_seconds: u64) { +pub fn spawn_worker( + state: Arc, + worker_id: usize, + timeout_seconds: u64, +) -> tokio::task::JoinHandle<()> { state.active_workers.fetch_add(1, Ordering::SeqCst); tokio::spawn(async move { worker_loop(state, worker_id, timeout_seconds).await; - }); + }) } async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds: u64) { @@ -75,7 +114,7 @@ pub async fn run_scaler( // Simple scaling logic: spawn worker if many pending tasks and under limit if pending > active * 2 && active < max_workers && worker_count < max_workers { - spawn_worker(state.clone(), worker_count, timeout_seconds); + let _handle = spawn_worker(state.clone(), worker_count, timeout_seconds); worker_count += 1; } @@ -87,7 +126,7 @@ pub async fn run_scaler( // If no active workers and tasks remaining, spawn one if active == 0 && pending > 0 { - spawn_worker(state.clone(), worker_count, timeout_seconds); + let _handle = spawn_worker(state.clone(), worker_count, timeout_seconds); worker_count += 1; } } From 9b5414e6109c43d115ae38761d57911b190c74e9 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 15:50:20 +1000 Subject: [PATCH 09/14] use the same output pattern as previous sub recipe implementation --- crates/goose/src/agents/agent.rs | 4 -- .../agents/recipe_tools/sub_recipe_tools.rs | 5 -- .../sub_agent_execute_task_tool.rs | 4 +- .../agents/sub_agent_execution_tool/tasks.rs | 58 ++++++++++++------- 4 files changed, 40 insertions(+), 31 deletions(-) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index cde1c8353dc3..e80405de5751 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -292,10 +292,6 @@ impl Agent { let extension_manager = self.extension_manager.read().await; let sub_recipe_manager = self.sub_recipe_manager.lock().await; let result: ToolCallResult = if sub_recipe_manager.is_sub_recipe_tool(&tool_call.name) { - println!( - "==========Dispatching sub recipe tool call: {}", - tool_call.name - ); sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index 546d6497f2b4..e1ab0d4391ef 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -95,10 +95,6 @@ fn prepare_command_params( } pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Result { - println!( - "==========Creating task for sub recipe: {}", - sub_recipe.name - ); let command_params = prepare_command_params(sub_recipe, params)?; let payload = json!({ "sub_recipe": { @@ -114,7 +110,6 @@ pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Re }; let task_json = serde_json::to_string(&task) .map_err(|e| anyhow::anyhow!("Failed to serialize Task: {}", e))?; - println!("==========Created task: {}", task_json); Ok(task_json) } diff --git a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs index 4af25cf314c6..bbc6ffb78819 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs @@ -3,11 +3,11 @@ use serde_json::Value; use crate::agents::{sub_agent_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult}; -pub const SUB_AGENT_EXECUTE_TASK_TOOL_NAME: &str = "sub_agent__execute_task"; +pub const SUB_AGENT_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; pub fn create_sub_agent_execute_task_tool() -> Tool { Tool::new( SUB_AGENT_EXECUTE_TASK_TOOL_NAME, - "Only use this tool when you want to execute sub agent task or sub recipe task. + "Only use this tool when you want to execute sub recipe task. **DO NOT** use this tool when you want to execute sub agent task. If the tasks are not specified to be executed in parallel, you should use this tool to run each task immediately by passing a single task to the tool for each run. If you want to execute tasks in parallel, you should pass a list of tasks to the tool.", serde_json::json!({ diff --git a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs index b764ea457e08..fe59ad108ddf 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs @@ -1,4 +1,4 @@ -use serde_json::{json, Value}; +use serde_json::Value; use std::process::Stdio; use std::time::Duration; use tokio::io::{AsyncBufReadExt, BufReader}; @@ -36,12 +36,13 @@ pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { } async fn execute_task(task: Task) -> Result { - println!("=======Executing task: {:?}", task); - + let mut output_identifier = task.id.clone(); let mut command = if task.task_type == "sub_recipe" { let sub_recipe = task.payload.get("sub_recipe").unwrap(); + let sub_recipe_name = sub_recipe.get("name").unwrap().as_str().unwrap(); let path = sub_recipe.get("recipe_path").unwrap().as_str().unwrap(); let command_parameters = sub_recipe.get("command_parameters").unwrap(); + output_identifier = format!("sub-recipe {}", sub_recipe_name); let mut cmd = Command::new("goose"); cmd.arg("run").arg("--recipe").arg(path); if let Some(params_map) = command_parameters.as_object() { @@ -74,28 +75,45 @@ async fn execute_task(task: Task) -> Result { .spawn() .map_err(|e| format!("Failed to spawn goose: {}", e))?; - // Pipe the stdout - if let Some(stdout) = child.stdout.take() { - let reader = BufReader::new(stdout); - let mut lines = reader.lines(); + let stdout = child.stdout.take().expect("Failed to capture stdout"); + let stderr = child.stderr.take().expect("Failed to capture stderr"); + + let mut stdout_reader = BufReader::new(stdout).lines(); + let mut stderr_reader = BufReader::new(stderr).lines(); - println!("--- Goose output ---"); - while let Ok(Some(line)) = lines.next_line().await { - println!("{}", line); + // Spawn background tasks to read from stdout and stderr + let output_identifier_clone = output_identifier.clone(); + let stdout_task = tokio::spawn(async move { + let mut buffer = String::new(); + while let Ok(Some(line)) = stdout_reader.next_line().await { + println!("[{}] {}", output_identifier_clone, line); + buffer.push_str(&line); + buffer.push('\n'); } - } + buffer + }); + + let stderr_task = tokio::spawn(async move { + let mut buffer = String::new(); + while let Ok(Some(line)) = stderr_reader.next_line().await { + eprintln!("[stderr for {}] {}", output_identifier, line); + buffer.push_str(&line); + buffer.push('\n'); + } + buffer + }); - // Await final status let status = child .wait() .await - .map_err(|e| format!("Failed to wait on goose: {}", e))?; - if !status.success() { - return Err(format!( - "Goose command failed with exit code: {:?}", - status.code() - )); - } + .map_err(|e| format!("Failed to wait for process: {}", e))?; + + let stdout_output = stdout_task.await.unwrap(); + let stderr_output = stderr_task.await.unwrap(); - Ok(json!({ "output": "Goose command completed." })) + if status.success() { + Ok(Value::String(stdout_output)) + } else { + Err(format!("Command failed:\n{}", stderr_output)) + } } From 17e04e51950731aa29e2ca32fc8294e0cc45eb98 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 20:33:12 +1000 Subject: [PATCH 10/14] moved the stateless functions to session utils module --- crates/goose-cli/src/session/mod.rs | 89 ++----------------------- crates/goose-cli/src/session/utils.rs | 96 +++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 83 deletions(-) create mode 100644 crates/goose-cli/src/session/utils.rs diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index 4b1485e0bf77..119d6306ebee 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -5,6 +5,7 @@ mod input; mod output; mod prompt; mod thinking; +mod utils; pub use self::export::message_to_markdown; pub use builder::{build_session, SessionBuilderConfig, SessionSettings}; @@ -72,41 +73,10 @@ impl CompletionCache { } } -pub enum PlannerResponseType { - Plan, - ClarifyingQuestions, -} - -/// Decide if the planner's reponse is a plan or a clarifying question -/// -/// This function is called after the planner has generated a response -/// to the user's message. The response is either a plan or a clarifying -/// question. -pub async fn classify_planner_response( - message_text: String, - provider: Arc, -) -> Result { - let prompt = format!("The text below is the output from an AI model which can either provide a plan or list of clarifying questions. Based on the text below, decide if the output is a \"plan\" or \"clarifying questions\".\n---\n{message_text}"); - - // Generate the description - let message = Message::user().with_text(&prompt); - let (result, _usage) = provider - .complete( - "Reply only with the classification label: \"plan\" or \"clarifying questions\"", - &[message], - &[], - ) - .await?; - - // println!("classify_planner_response: {result:?}\n"); // TODO: remove - - let predicted = result.as_concat_text(); - if predicted.to_lowercase().contains("plan") { - Ok(PlannerResponseType::Plan) - } else { - Ok(PlannerResponseType::ClarifyingQuestions) - } -} +// Re-export types and functions from utils module for backward compatibility +pub use utils::{ + classify_planner_response, get_reasoner, summarize_context_messages, PlannerResponseType, +}; impl Session { pub fn new( @@ -141,21 +111,6 @@ impl Session { } } - /// Helper function to summarize context messages - async fn summarize_context_messages( - messages: &mut Vec, - agent: &Agent, - message_suffix: &str, - ) -> Result<()> { - // Summarize messages to fit within context length - let (summarized_messages, _) = agent.summarize_context(messages).await?; - let msg = format!("Context maxed out\n{}\n{}", "-".repeat(50), message_suffix); - output::render_text(&msg, Some(Color::Yellow), true); - *messages = summarized_messages; - - Ok(()) - } - /// Add a stdio extension to the session /// /// # Arguments @@ -890,7 +845,7 @@ impl Session { } else { "Goose automatically summarized messages to continue processing." }; - Self::summarize_context_messages(&mut self.messages, &self.agent, message_suffix).await?; + summarize_context_messages(&mut self.messages, &self.agent, message_suffix).await?; } _ => { unreachable!() @@ -1401,35 +1356,3 @@ impl Session { Ok(path) } } - -fn get_reasoner() -> Result, anyhow::Error> { - use goose::model::ModelConfig; - use goose::providers::create; - - let config = Config::global(); - - // Try planner-specific provider first, fallback to default provider - let provider = if let Ok(provider) = config.get_param::("GOOSE_PLANNER_PROVIDER") { - provider - } else { - println!("WARNING: GOOSE_PLANNER_PROVIDER not found. Using default provider..."); - config - .get_param::("GOOSE_PROVIDER") - .expect("No provider configured. Run 'goose configure' first") - }; - - // Try planner-specific model first, fallback to default model - let model = if let Ok(model) = config.get_param::("GOOSE_PLANNER_MODEL") { - model - } else { - println!("WARNING: GOOSE_PLANNER_MODEL not found. Using default model..."); - config - .get_param::("GOOSE_MODEL") - .expect("No model configured. Run 'goose configure' first") - }; - - let model_config = ModelConfig::new(model); - let reasoner = create(&provider, model_config)?; - - Ok(reasoner) -} diff --git a/crates/goose-cli/src/session/utils.rs b/crates/goose-cli/src/session/utils.rs new file mode 100644 index 000000000000..020078481736 --- /dev/null +++ b/crates/goose-cli/src/session/utils.rs @@ -0,0 +1,96 @@ +use anyhow::Result; +use console::Color; +use goose::agents::Agent; +use goose::config::Config; +use goose::message::Message; +use goose::model::ModelConfig; +use goose::providers::base::Provider; +use goose::providers::create; +use std::sync::Arc; + +use crate::session::output; + +pub enum PlannerResponseType { + Plan, + ClarifyingQuestions, +} + +/// Decide if the planner's response is a plan or a clarifying question +/// +/// This function is called after the planner has generated a response +/// to the user's message. The response is either a plan or a clarifying +/// question. +pub async fn classify_planner_response( + message_text: String, + provider: Arc, +) -> Result { + let prompt = format!("The text below is the output from an AI model which can either provide a plan or list of clarifying questions. Based on the text below, decide if the output is a \"plan\" or \"clarifying questions\".\n---\n{message_text}"); + + // Generate the description + let message = Message::user().with_text(&prompt); + let (result, _usage) = provider + .complete( + "Reply only with the classification label: \"plan\" or \"clarifying questions\"", + &[message], + &[], + ) + .await?; + + // println!("classify_planner_response: {result:?}\n"); // TODO: remove + + let predicted = result.as_concat_text(); + if predicted.to_lowercase().contains("plan") { + Ok(PlannerResponseType::Plan) + } else { + Ok(PlannerResponseType::ClarifyingQuestions) + } +} + +/// Get a reasoner provider based on configuration +/// +/// Tries planner-specific provider/model first, falls back to default provider/model +pub fn get_reasoner() -> Result, anyhow::Error> { + let config = Config::global(); + + // Try planner-specific provider first, fallback to default provider + let provider = if let Ok(provider) = config.get_param::("GOOSE_PLANNER_PROVIDER") { + provider + } else { + println!("WARNING: GOOSE_PLANNER_PROVIDER not found. Using default provider..."); + config + .get_param::("GOOSE_PROVIDER") + .expect("No provider configured. Run 'goose configure' first") + }; + + // Try planner-specific model first, fallback to default model + let model = if let Ok(model) = config.get_param::("GOOSE_PLANNER_MODEL") { + model + } else { + println!("WARNING: GOOSE_PLANNER_MODEL not found. Using default model..."); + config + .get_param::("GOOSE_MODEL") + .expect("No model configured. Run 'goose configure' first") + }; + + let model_config = ModelConfig::new(model); + let reasoner = create(&provider, model_config)?; + + Ok(reasoner) +} + +/// Helper function to summarize context messages +/// +/// This is a stateless utility function that summarizes messages to fit within context length +pub async fn summarize_context_messages( + messages: &mut Vec, + agent: &Agent, + message_suffix: &str, +) -> Result<()> { + // Summarize messages to fit within context length + let (summarized_messages, _) = agent.summarize_context(messages).await?; + let msg = format!("Context maxed out\n{}\n{}", "-".repeat(50), message_suffix); + output::render_text(&msg, Some(Color::Yellow), true); + *messages = summarized_messages; + + Ok(()) +} From 684ba42452e518e155f4c4a25f236619f98dd7ce Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Thu, 3 Jul 2025 20:33:46 +1000 Subject: [PATCH 11/14] Revert "moved the stateless functions to session utils module" This reverts commit 17e04e51950731aa29e2ca32fc8294e0cc45eb98. --- crates/goose-cli/src/session/mod.rs | 89 +++++++++++++++++++++++-- crates/goose-cli/src/session/utils.rs | 96 --------------------------- 2 files changed, 83 insertions(+), 102 deletions(-) delete mode 100644 crates/goose-cli/src/session/utils.rs diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index 119d6306ebee..4b1485e0bf77 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -5,7 +5,6 @@ mod input; mod output; mod prompt; mod thinking; -mod utils; pub use self::export::message_to_markdown; pub use builder::{build_session, SessionBuilderConfig, SessionSettings}; @@ -73,10 +72,41 @@ impl CompletionCache { } } -// Re-export types and functions from utils module for backward compatibility -pub use utils::{ - classify_planner_response, get_reasoner, summarize_context_messages, PlannerResponseType, -}; +pub enum PlannerResponseType { + Plan, + ClarifyingQuestions, +} + +/// Decide if the planner's reponse is a plan or a clarifying question +/// +/// This function is called after the planner has generated a response +/// to the user's message. The response is either a plan or a clarifying +/// question. +pub async fn classify_planner_response( + message_text: String, + provider: Arc, +) -> Result { + let prompt = format!("The text below is the output from an AI model which can either provide a plan or list of clarifying questions. Based on the text below, decide if the output is a \"plan\" or \"clarifying questions\".\n---\n{message_text}"); + + // Generate the description + let message = Message::user().with_text(&prompt); + let (result, _usage) = provider + .complete( + "Reply only with the classification label: \"plan\" or \"clarifying questions\"", + &[message], + &[], + ) + .await?; + + // println!("classify_planner_response: {result:?}\n"); // TODO: remove + + let predicted = result.as_concat_text(); + if predicted.to_lowercase().contains("plan") { + Ok(PlannerResponseType::Plan) + } else { + Ok(PlannerResponseType::ClarifyingQuestions) + } +} impl Session { pub fn new( @@ -111,6 +141,21 @@ impl Session { } } + /// Helper function to summarize context messages + async fn summarize_context_messages( + messages: &mut Vec, + agent: &Agent, + message_suffix: &str, + ) -> Result<()> { + // Summarize messages to fit within context length + let (summarized_messages, _) = agent.summarize_context(messages).await?; + let msg = format!("Context maxed out\n{}\n{}", "-".repeat(50), message_suffix); + output::render_text(&msg, Some(Color::Yellow), true); + *messages = summarized_messages; + + Ok(()) + } + /// Add a stdio extension to the session /// /// # Arguments @@ -845,7 +890,7 @@ impl Session { } else { "Goose automatically summarized messages to continue processing." }; - summarize_context_messages(&mut self.messages, &self.agent, message_suffix).await?; + Self::summarize_context_messages(&mut self.messages, &self.agent, message_suffix).await?; } _ => { unreachable!() @@ -1356,3 +1401,35 @@ impl Session { Ok(path) } } + +fn get_reasoner() -> Result, anyhow::Error> { + use goose::model::ModelConfig; + use goose::providers::create; + + let config = Config::global(); + + // Try planner-specific provider first, fallback to default provider + let provider = if let Ok(provider) = config.get_param::("GOOSE_PLANNER_PROVIDER") { + provider + } else { + println!("WARNING: GOOSE_PLANNER_PROVIDER not found. Using default provider..."); + config + .get_param::("GOOSE_PROVIDER") + .expect("No provider configured. Run 'goose configure' first") + }; + + // Try planner-specific model first, fallback to default model + let model = if let Ok(model) = config.get_param::("GOOSE_PLANNER_MODEL") { + model + } else { + println!("WARNING: GOOSE_PLANNER_MODEL not found. Using default model..."); + config + .get_param::("GOOSE_MODEL") + .expect("No model configured. Run 'goose configure' first") + }; + + let model_config = ModelConfig::new(model); + let reasoner = create(&provider, model_config)?; + + Ok(reasoner) +} diff --git a/crates/goose-cli/src/session/utils.rs b/crates/goose-cli/src/session/utils.rs deleted file mode 100644 index 020078481736..000000000000 --- a/crates/goose-cli/src/session/utils.rs +++ /dev/null @@ -1,96 +0,0 @@ -use anyhow::Result; -use console::Color; -use goose::agents::Agent; -use goose::config::Config; -use goose::message::Message; -use goose::model::ModelConfig; -use goose::providers::base::Provider; -use goose::providers::create; -use std::sync::Arc; - -use crate::session::output; - -pub enum PlannerResponseType { - Plan, - ClarifyingQuestions, -} - -/// Decide if the planner's response is a plan or a clarifying question -/// -/// This function is called after the planner has generated a response -/// to the user's message. The response is either a plan or a clarifying -/// question. -pub async fn classify_planner_response( - message_text: String, - provider: Arc, -) -> Result { - let prompt = format!("The text below is the output from an AI model which can either provide a plan or list of clarifying questions. Based on the text below, decide if the output is a \"plan\" or \"clarifying questions\".\n---\n{message_text}"); - - // Generate the description - let message = Message::user().with_text(&prompt); - let (result, _usage) = provider - .complete( - "Reply only with the classification label: \"plan\" or \"clarifying questions\"", - &[message], - &[], - ) - .await?; - - // println!("classify_planner_response: {result:?}\n"); // TODO: remove - - let predicted = result.as_concat_text(); - if predicted.to_lowercase().contains("plan") { - Ok(PlannerResponseType::Plan) - } else { - Ok(PlannerResponseType::ClarifyingQuestions) - } -} - -/// Get a reasoner provider based on configuration -/// -/// Tries planner-specific provider/model first, falls back to default provider/model -pub fn get_reasoner() -> Result, anyhow::Error> { - let config = Config::global(); - - // Try planner-specific provider first, fallback to default provider - let provider = if let Ok(provider) = config.get_param::("GOOSE_PLANNER_PROVIDER") { - provider - } else { - println!("WARNING: GOOSE_PLANNER_PROVIDER not found. Using default provider..."); - config - .get_param::("GOOSE_PROVIDER") - .expect("No provider configured. Run 'goose configure' first") - }; - - // Try planner-specific model first, fallback to default model - let model = if let Ok(model) = config.get_param::("GOOSE_PLANNER_MODEL") { - model - } else { - println!("WARNING: GOOSE_PLANNER_MODEL not found. Using default model..."); - config - .get_param::("GOOSE_MODEL") - .expect("No model configured. Run 'goose configure' first") - }; - - let model_config = ModelConfig::new(model); - let reasoner = create(&provider, model_config)?; - - Ok(reasoner) -} - -/// Helper function to summarize context messages -/// -/// This is a stateless utility function that summarizes messages to fit within context length -pub async fn summarize_context_messages( - messages: &mut Vec, - agent: &Agent, - message_suffix: &str, -) -> Result<()> { - // Summarize messages to fit within context length - let (summarized_messages, _) = agent.summarize_context(messages).await?; - let msg = format!("Context maxed out\n{}\n{}", "-".repeat(50), message_suffix); - output::render_text(&msg, Some(Color::Yellow), true); - *messages = summarized_messages; - - Ok(()) -} From 0dc0b441c5832fbc8c71667b0fdb97689f8f74e7 Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Fri, 4 Jul 2025 11:23:27 +1000 Subject: [PATCH 12/14] renamed to sub-recipe-execute-tool --- crates/goose/src/agents/agent.rs | 11 ++++++----- crates/goose/src/agents/mod.rs | 2 +- .../goose/src/agents/recipe_tools/sub_recipe_tools.rs | 2 +- .../executor.rs | 6 +++--- .../lib.rs | 6 +++--- .../mod.rs | 2 +- .../sub_recipe_execute_task_tool.rs} | 10 ++++++---- .../tasks.rs | 2 +- .../types.rs | 0 .../workers.rs | 6 +++--- 10 files changed, 25 insertions(+), 22 deletions(-) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/executor.rs (93%) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/lib.rs (83%) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/mod.rs (62%) rename crates/goose/src/agents/{sub_agent_execution_tool/sub_agent_execute_task_tool.rs => sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs} (93%) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/tasks.rs (98%) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/types.rs (100%) rename crates/goose/src/agents/{sub_agent_execution_tool => sub_recipe_execution_tool}/workers.rs (95%) diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 03618b17fd79..3ba407632296 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -9,8 +9,8 @@ use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use mcp_core::protocol::JsonRpcMessage; use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME}; -use crate::agents::sub_agent_execution_tool::sub_agent_execute_task_tool::{ - self, SUB_AGENT_EXECUTE_TASK_TOOL_NAME, +use crate::agents::sub_recipe_execution_tool::sub_recipe_execute_task_tool::{ + self, SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, }; use crate::agents::sub_recipe_manager::SubRecipeManager; use crate::config::{Config, ExtensionConfigManager, PermissionManager}; @@ -293,8 +293,8 @@ impl Agent { sub_recipe_manager .dispatch_sub_recipe_tool_call(&tool_call.name, tool_call.arguments.clone()) .await - } else if tool_call.name == SUB_AGENT_EXECUTE_TASK_TOOL_NAME { - sub_agent_execute_task_tool::run_tasks(tool_call.arguments.clone()).await + } else if tool_call.name == SUB_RECIPE_EXECUTE_TASK_TOOL_NAME { + sub_recipe_execute_task_tool::run_tasks(tool_call.arguments.clone()).await } else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME { // Check if the tool is read_resource and handle it separately ToolCallResult::from( @@ -578,7 +578,8 @@ impl Agent { if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { prefixed_tools.push(final_output_tool.tool()); } - prefixed_tools.push(sub_agent_execute_task_tool::create_sub_agent_execute_task_tool()); + prefixed_tools + .push(sub_recipe_execute_task_tool::create_sub_recipe_execute_task_tool()); } prefixed_tools diff --git a/crates/goose/src/agents/mod.rs b/crates/goose/src/agents/mod.rs index e66c99399f56..353e57acde12 100644 --- a/crates/goose/src/agents/mod.rs +++ b/crates/goose/src/agents/mod.rs @@ -11,7 +11,7 @@ mod reply_parts; mod router_tool_selector; mod router_tools; mod schedule_tool; -pub mod sub_agent_execution_tool; +pub mod sub_recipe_execution_tool; pub mod sub_recipe_manager; pub mod subagent; pub mod subagent_handler; diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index e1ab0d4391ef..928cf8bd0845 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -4,7 +4,7 @@ use anyhow::Result; use mcp_core::tool::{Tool, ToolAnnotations}; use serde_json::{json, Map, Value}; -use crate::agents::sub_agent_execution_tool::lib::Task; +use crate::agents::sub_recipe_execution_tool::lib::Task; use crate::recipe::{Recipe, RecipeParameter, RecipeParameterRequirement, SubRecipe}; pub const SUB_RECIPE_TASK_TOOL_NAME_PREFIX: &str = "subrecipe__create_task"; diff --git a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs b/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs similarity index 93% rename from crates/goose/src/agents/sub_agent_execution_tool/executor.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/executor.rs index f6e568b72d86..b796d412984d 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/executor.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs @@ -3,11 +3,11 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::Instant; -use crate::agents::sub_agent_execution_tool::lib::{ +use crate::agents::sub_recipe_execution_tool::lib::{ Config, ExecutionResponse, ExecutionStats, Task, TaskResult, }; -use crate::agents::sub_agent_execution_tool::tasks::process_task; -use crate::agents::sub_agent_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; +use crate::agents::sub_recipe_execution_tool::tasks::process_task; +use crate::agents::sub_recipe_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; pub async fn execute_single_task(task: &Task, config: Config) -> ExecutionResponse { let start_time = Instant::now(); diff --git a/crates/goose/src/agents/sub_agent_execution_tool/lib.rs b/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs similarity index 83% rename from crates/goose/src/agents/sub_agent_execution_tool/lib.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/lib.rs index 4ecc6bb838f8..1ef094d88fe3 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/lib.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs @@ -1,6 +1,6 @@ -use crate::agents::sub_agent_execution_tool::executor::execute_single_task; -pub use crate::agents::sub_agent_execution_tool::executor::parallel_execute; -pub use crate::agents::sub_agent_execution_tool::types::{ +use crate::agents::sub_recipe_execution_tool::executor::execute_single_task; +pub use crate::agents::sub_recipe_execution_tool::executor::parallel_execute; +pub use crate::agents::sub_recipe_execution_tool::types::{ Config, ExecutionResponse, ExecutionStats, Task, TaskResult, }; diff --git a/crates/goose/src/agents/sub_agent_execution_tool/mod.rs b/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs similarity index 62% rename from crates/goose/src/agents/sub_agent_execution_tool/mod.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/mod.rs index 360f36e301aa..a49791e2776f 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/mod.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/mod.rs @@ -1,6 +1,6 @@ mod executor; pub mod lib; -pub mod sub_agent_execute_task_tool; +pub mod sub_recipe_execute_task_tool; mod tasks; mod types; mod workers; diff --git a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs similarity index 93% rename from crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs index bbc6ffb78819..09ca3b64d444 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/sub_agent_execute_task_tool.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs @@ -1,12 +1,14 @@ use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; use serde_json::Value; -use crate::agents::{sub_agent_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult}; +use crate::agents::{ + sub_recipe_execution_tool::lib::execute_tasks, tool_execution::ToolCallResult, +}; -pub const SUB_AGENT_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; -pub fn create_sub_agent_execute_task_tool() -> Tool { +pub const SUB_RECIPE_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; +pub fn create_sub_recipe_execute_task_tool() -> Tool { Tool::new( - SUB_AGENT_EXECUTE_TASK_TOOL_NAME, + SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, "Only use this tool when you want to execute sub recipe task. **DO NOT** use this tool when you want to execute sub agent task. If the tasks are not specified to be executed in parallel, you should use this tool to run each task immediately by passing a single task to the tool for each run. If you want to execute tasks in parallel, you should pass a list of tasks to the tool.", diff --git a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs similarity index 98% rename from crates/goose/src/agents/sub_agent_execution_tool/tasks.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs index fe59ad108ddf..4e4584aa0b34 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs @@ -5,7 +5,7 @@ use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; use tokio::time::timeout; -use crate::agents::sub_agent_execution_tool::types::{Task, TaskResult}; +use crate::agents::sub_recipe_execution_tool::types::{Task, TaskResult}; // Process a single task based on its type pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { diff --git a/crates/goose/src/agents/sub_agent_execution_tool/types.rs b/crates/goose/src/agents/sub_recipe_execution_tool/types.rs similarity index 100% rename from crates/goose/src/agents/sub_agent_execution_tool/types.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/types.rs diff --git a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs similarity index 95% rename from crates/goose/src/agents/sub_agent_execution_tool/workers.rs rename to crates/goose/src/agents/sub_recipe_execution_tool/workers.rs index 86169073a139..e48f19c4d360 100644 --- a/crates/goose/src/agents/sub_agent_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs @@ -1,5 +1,5 @@ -use crate::agents::sub_agent_execution_tool::tasks::process_task; -use crate::agents::sub_agent_execution_tool::types::{Task, TaskResult}; +use crate::agents::sub_recipe_execution_tool::tasks::process_task; +use crate::agents::sub_recipe_execution_tool::types::{Task, TaskResult}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; @@ -8,7 +8,7 @@ use tokio::time::{sleep, Duration}; #[cfg(test)] mod tests { use super::*; - use crate::agents::sub_agent_execution_tool::types::Task; + use crate::agents::sub_recipe_execution_tool::types::Task; #[tokio::test] async fn test_spawn_worker_returns_handle() { From dbb6dbbd2fd4792318c1088619aab0434a0c789e Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Fri, 4 Jul 2025 11:28:58 +1000 Subject: [PATCH 13/14] cleaned tool description --- .../sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs index 09ca3b64d444..8093e1051d09 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs @@ -9,7 +9,7 @@ pub const SUB_RECIPE_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; pub fn create_sub_recipe_execute_task_tool() -> Tool { Tool::new( SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, - "Only use this tool when you want to execute sub recipe task. **DO NOT** use this tool when you want to execute sub agent task. + "Only use this tool when you want to execute sub recipe task. If the tasks are not specified to be executed in parallel, you should use this tool to run each task immediately by passing a single task to the tool for each run. If you want to execute tasks in parallel, you should pass a list of tasks to the tool.", serde_json::json!({ From 197d3d1704a7b3932ec1fea0228efd36f9ea793e Mon Sep 17 00:00:00 2001 From: Lifei Zhou Date: Fri, 4 Jul 2025 12:28:55 +1000 Subject: [PATCH 14/14] added extra property execution mode --- .../agents/sub_recipe_execution_tool/lib.rs | 28 +++++++++------ .../sub_recipe_execute_task_tool.rs | 34 ++++++++++++++++--- 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs b/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs index 1ef094d88fe3..9df784a46be0 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/lib.rs @@ -6,7 +6,7 @@ pub use crate::agents::sub_recipe_execution_tool::types::{ use serde_json::Value; -pub async fn execute_tasks(input: Value) -> Result { +pub async fn execute_tasks(input: Value, execution_mode: &str) -> Result { let tasks: Vec = serde_json::from_value(input.get("tasks").ok_or("Missing tasks field")?.clone()) .map_err(|e| format!("Failed to parse tasks: {}", e))?; @@ -18,15 +18,21 @@ pub async fn execute_tasks(input: Value) -> Result { Config::default() }; let task_count = tasks.len(); - if task_count == 1 { - let response = execute_single_task(&tasks[0], config).await; - return serde_json::to_value(response) - .map_err(|e| format!("Failed to serialize response: {}", e)); + match execution_mode { + "sequential" => { + if task_count == 1 { + let response = execute_single_task(&tasks[0], config).await; + serde_json::to_value(response) + .map_err(|e| format!("Failed to serialize response: {}", e)) + } else { + Err("Sequential execution mode requires exactly one task".to_string()) + } + } + "parallel" => { + let response = parallel_execute(tasks, config).await; + serde_json::to_value(response) + .map_err(|e| format!("Failed to serialize response: {}", e)) + } + _ => Err("Invalid execution mode".to_string()), } - - // Execute tasks - let response = parallel_execute(tasks, config).await; - - // Convert response to JSON - serde_json::to_value(response).map_err(|e| format!("Failed to serialize response: {}", e)) } diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs index 8093e1051d09..46738b813b13 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/sub_recipe_execute_task_tool.rs @@ -9,12 +9,29 @@ pub const SUB_RECIPE_EXECUTE_TASK_TOOL_NAME: &str = "sub_recipe__execute_task"; pub fn create_sub_recipe_execute_task_tool() -> Tool { Tool::new( SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, - "Only use this tool when you want to execute sub recipe task. - If the tasks are not specified to be executed in parallel, you should use this tool to run each task immediately by passing a single task to the tool for each run. - If you want to execute tasks in parallel, you should pass a list of tasks to the tool.", + "Only use this tool when you execute sub recipe task. +EXECUTION STRATEGY: +- DEFAULT: Execute tasks sequentially (one at a time) unless user explicitly requests parallel execution +- PARALLEL: Only when user explicitly uses keywords like 'parallel', 'simultaneously', 'at the same time', 'concurrently' + +IMPLEMENTATION: +- Sequential execution: Call this tool multiple times, passing exactly ONE task per call +- Parallel execution: Call this tool once, passing an ARRAY of all tasks + +EXAMPLES: +- User: 'get weather and tell me a joke' → Sequential (2 separate tool calls, 1 task each) +- User: 'get weather and joke in parallel' → Parallel (1 tool call with array of 2 tasks) +- User: 'run these simultaneously' → Parallel (1 tool call with task array) +- User: 'do task A then task B' → Sequential (2 separate tool calls)", serde_json::json!({ "type": "object", "properties": { + "execution_mode": { + "type": "string", + "enum": ["sequential", "parallel"], + "default": "sequential", + "description": "Execution strategy for multiple tasks. Use 'sequential' (default) unless user explicitly requests parallel execution with words like 'parallel', 'simultaneously', 'at the same time', or 'concurrently'." + }, "tasks": { "type": "array", "items": { @@ -26,6 +43,8 @@ pub fn create_sub_recipe_execute_task_tool() -> Tool { }, "task_type": { "type": "string", + "enum": ["sub_recipe", "text_instruction"], + "default": "sub_recipe", "description": "the type of task to execute, can be one of: sub_recipe, text_instruction" }, "payload": { @@ -88,7 +107,14 @@ pub fn create_sub_recipe_execute_task_tool() -> Tool { } pub async fn run_tasks(execute_data: Value) -> ToolCallResult { - match execute_tasks(execute_data).await { + let execute_data_clone = execute_data.clone(); + let default_execution_mode_value = Value::String("sequential".to_string()); + let execution_mode = execute_data_clone + .get("execution_mode") + .unwrap_or(&default_execution_mode_value) + .as_str() + .unwrap_or("sequential"); + match execute_tasks(execute_data, execution_mode).await { Ok(result) => { let output = serde_json::to_string(&result).unwrap(); ToolCallResult::from(Ok(vec![Content::text(output)]))