diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index 0742ba33fe1d..c67ae89e5789 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -75,8 +75,6 @@ pub struct Agent { pub(super) tool_monitor: Arc>>, pub(super) router_tool_selector: Mutex>>>, pub(super) scheduler_service: Mutex>>, - pub(super) mcp_tx: Mutex>, - pub(super) mcp_notification_rx: Arc>>, pub(super) retry_manager: RetryManager, } @@ -132,8 +130,6 @@ impl Agent { // Create channels with buffer size 32 (adjust if needed) let (confirm_tx, confirm_rx) = mpsc::channel(32); let (tool_tx, tool_rx) = mpsc::channel(32); - // Add MCP notification channel - let (mcp_tx, mcp_rx) = mpsc::channel(100); let tool_monitor = Arc::new(Mutex::new(None)); let retry_manager = RetryManager::with_tool_monitor(tool_monitor.clone()); @@ -154,9 +150,6 @@ impl Agent { tool_monitor, router_tool_selector: Mutex::new(None), scheduler_service: Mutex::new(None), - // Initialize with MCP notification support - mcp_tx: Mutex::new(mcp_tx), - mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)), retry_manager, } } @@ -342,9 +335,8 @@ impl Agent { .await } else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME { let provider = self.provider().await.ok(); - let mcp_tx = self.mcp_tx.lock().await.clone(); - let task_config = TaskConfig::new(provider, mcp_tx); + let task_config = TaskConfig::new(provider); subagent_execute_task_tool::run_tasks( tool_call.arguments.clone(), task_config, @@ -771,24 +763,6 @@ impl Agent { break; } - // Handle MCP notifications from subagents - let mcp_notifications = self.get_mcp_notifications().await; - for notification in mcp_notifications { - if let JsonRpcMessage::Notification(notif) = ¬ification { - if let Some(data) = notif.notification.params.get("data") { - if let (Some(subagent_id), Some(_message)) = ( - data.get("subagent_id").and_then(|v| v.as_str()), - data.get("message").and_then(|v| v.as_str()), - ) { - yield AgentEvent::McpNotification(( - subagent_id.to_string(), - notification.clone(), - )); - } - } - } - } - let mut stream = Self::stream_response_from_provider( self.provider().await?, &system_prompt, @@ -1085,18 +1059,6 @@ impl Agent { prompt_manager.add_system_prompt_extra(instruction); } - /// Get MCP notifications from subagents - pub async fn get_mcp_notifications(&self) -> Vec { - let mut notifications = Vec::new(); - let mut rx = self.mcp_notification_rx.lock().await; - - while let Ok(notification) = rx.try_recv() { - notifications.push(notification); - } - - notifications - } - pub async fn update_provider(&self, provider: Arc) -> Result<()> { let mut current_provider = self.provider.lock().await; *current_provider = Some(provider.clone()); diff --git a/crates/goose/src/agents/subagent.rs b/crates/goose/src/agents/subagent.rs index d4150f2ff975..f761270fba25 100644 --- a/crates/goose/src/agents/subagent.rs +++ b/crates/goose/src/agents/subagent.rs @@ -1,3 +1,4 @@ +use crate::agents::subagent_task_config::DEFAULT_SUBAGENT_MAX_TURNS; use crate::{ agents::extension::ExtensionConfig, agents::{extension_manager::ExtensionManager, Agent, TaskConfig}, @@ -9,8 +10,6 @@ use crate::{ use anyhow::anyhow; use chrono::{DateTime, Utc}; use mcp_core::{handler::ToolError, tool::Tool}; -use rmcp::model::{JsonRpcMessage, JsonRpcNotification, JsonRpcVersion2_0, Notification}; -use rmcp::object; use serde::{Deserialize, Serialize}; // use serde_json::{self}; use std::{collections::HashMap, sync::Arc}; @@ -53,7 +52,7 @@ impl SubAgent { #[instrument(skip(task_config))] pub async fn new( task_config: TaskConfig, - ) -> Result<(Arc, tokio::task::JoinHandle<()>), anyhow::Error> { + ) -> Result, anyhow::Error> { debug!("Creating new subagent with id: {}", task_config.id); // Create a new extension manager for this subagent @@ -89,21 +88,8 @@ impl SubAgent { extension_manager: Arc::new(RwLock::new(extension_manager)), }); - // Send initial MCP notification - let subagent_clone = Arc::clone(&subagent); - subagent_clone - .send_mcp_notification("subagent_created", "Subagent created and ready") - .await; - - // Create a background task handle (for future use with streaming/monitoring) - let subagent_clone = Arc::clone(&subagent); - let handle = tokio::spawn(async move { - // This could be used for background monitoring, cleanup, etc. - debug!("Subagent {} background task started", subagent_clone.id); - }); - debug!("Subagent {} created successfully", subagent.id); - Ok((subagent, handle)) + Ok(subagent) } /// Get the current status of the subagent @@ -118,51 +104,6 @@ impl SubAgent { let mut current_status = self.status.write().await; *current_status = status.clone(); } // Write lock is released here! - - // Send MCP notifications based on status - match &status { - SubAgentStatus::Processing => { - self.send_mcp_notification("status_changed", "Processing request") - .await; - } - SubAgentStatus::Completed(msg) => { - self.send_mcp_notification("completed", &format!("Completed: {}", msg)) - .await; - } - SubAgentStatus::Terminated => { - self.send_mcp_notification("terminated", "Subagent terminated") - .await; - } - _ => {} - } - } - - /// Send an MCP notification about the subagent's activity - pub async fn send_mcp_notification(&self, notification_type: &str, message: &str) { - let notification = JsonRpcMessage::Notification(JsonRpcNotification { - jsonrpc: JsonRpcVersion2_0, - notification: Notification { - method: "notifications/message".to_string(), - params: object!({ - "level": "info", - "logger": format!("subagent_{}", self.id), - "data": { - "subagent_id": self.id, - "type": notification_type, - "message": message, - "timestamp": Utc::now().to_rfc3339() - } - }), - extensions: Default::default(), - }, - }); - - if let Err(e) = self.config.mcp_tx.send(notification).await { - error!( - "Failed to send MCP notification from subagent {}: {}", - self.id, e - ); - } } /// Get current progress information @@ -193,8 +134,6 @@ impl SubAgent { task_config: TaskConfig, ) -> Result { debug!("Processing message for subagent {}", self.id); - self.send_mcp_notification("message_processing", &format!("Processing: {}", message)) - .await; // Get provider from task config let provider = self @@ -203,20 +142,6 @@ impl SubAgent { .as_ref() .ok_or_else(|| anyhow!("No provider configured for subagent"))?; - // Check if we've exceeded max turns - { - let turn_count = *self.turn_count.lock().await; - if let Some(max_turns) = self.config.max_turns { - if turn_count >= max_turns { - self.set_status(SubAgentStatus::Completed( - "Maximum turns exceeded".to_string(), - )) - .await; - return Err(anyhow!("Maximum turns ({}) exceeded", max_turns)); - } - } - } - // Set status to processing self.set_status(SubAgentStatus::Processing).await; @@ -227,17 +152,6 @@ impl SubAgent { conversation.push(user_message.clone()); } - // Increment turn count - { - let mut turn_count = self.turn_count.lock().await; - *turn_count += 1; - self.send_mcp_notification( - "turn_progress", - &format!("Turn {}/{}", turn_count, self.config.max_turns.unwrap_or(0)), - ) - .await; - } - // Get the current conversation for context let mut messages = self.get_conversation().await; @@ -255,8 +169,14 @@ impl SubAgent { // Build system prompt using the template let system_prompt = self.build_system_prompt(&tools).await?; + // Generate response from provider with loop for tool processing (max_turns iterations) + let mut loop_count = 0; + let max_turns = self.config.max_turns.unwrap_or(DEFAULT_SUBAGENT_MAX_TURNS); + // Generate response from provider loop { + loop_count += 1; + match Agent::generate_response_from_provider( Arc::clone(provider), &system_prompt, @@ -281,19 +201,9 @@ impl SubAgent { .collect(); // If there are no tool requests, we're done - if tool_requests.is_empty() { + if tool_requests.is_empty() || loop_count >= max_turns { self.add_message(response.clone()).await; - // Send notification about response - self.send_mcp_notification( - "response_generated", - &format!("Responded: {}", response.as_concat_text()), - ) - .await; - - // Add delay before completion to ensure all processing finishes - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - // Set status back to ready and return the final response self.set_status(SubAgentStatus::Completed("Completed!".to_string())) .await; @@ -306,13 +216,6 @@ impl SubAgent { // Process each tool request and create user response messages for request in &tool_requests { if let Ok(tool_call) = &request.tool_call { - // Send notification about tool usage - self.send_mcp_notification( - "tool_usage", - &format!("Using tool: {}", tool_call.name), - ) - .await; - // Handle platform tools or dispatch to extension manager let tool_result = match self .extension_manager @@ -331,13 +234,6 @@ impl SubAgent { let tool_response_message = Message::user() .with_tool_response(request.id.clone(), Ok(result.clone())); messages.push(tool_response_message); - - // Send notification about tool completion - self.send_mcp_notification( - "tool_completed", - &format!("Tool {} completed successfully", tool_call.name), - ) - .await; } Err(e) => { // Create a user message with the tool error @@ -346,13 +242,6 @@ impl SubAgent { Err(ToolError::ExecutionError(e.to_string())), ); messages.push(tool_error_message); - - // Send notification about tool error - self.send_mcp_notification( - "tool_error", - &format!("Tool {} error: {}", tool_call.name, e), - ) - .await; } } } diff --git a/crates/goose/src/agents/subagent_handler.rs b/crates/goose/src/agents/subagent_handler.rs index 960a515b8939..3612d48a30e9 100644 --- a/crates/goose/src/agents/subagent_handler.rs +++ b/crates/goose/src/agents/subagent_handler.rs @@ -8,7 +8,9 @@ pub async fn run_complete_subagent_task( task_config: TaskConfig, ) -> Result { // Create the subagent with the parent agent's provider - let (subagent, handle) = SubAgent::new(task_config.clone()).await?; + let subagent = SubAgent::new(task_config.clone()) + .await + .map_err(|e| ToolError::ExecutionError(format!("Failed to create subagent: {}", e)))?; // Execute the subagent task let result = subagent @@ -16,11 +18,6 @@ pub async fn run_complete_subagent_task( .await?; let response_text = result.as_concat_text(); - // Clean up the subagent handle - if let Err(e) = handle.await { - tracing::debug!("Subagent handle cleanup error: {}", e); - } - // Return the result Ok(response_text) } diff --git a/crates/goose/src/agents/subagent_task_config.rs b/crates/goose/src/agents/subagent_task_config.rs index 81bcd999b8e8..5a0046292967 100644 --- a/crates/goose/src/agents/subagent_task_config.rs +++ b/crates/goose/src/agents/subagent_task_config.rs @@ -1,16 +1,20 @@ use crate::providers::base::Provider; -use rmcp::model::JsonRpcMessage; +use std::env; use std::fmt; use std::sync::Arc; -use tokio::sync::mpsc; use uuid::Uuid; +/// Default maximum number of turns for task execution +pub const DEFAULT_SUBAGENT_MAX_TURNS: usize = 5; + +/// Environment variable name for configuring max turns +pub const GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR: &str = "GOOSE_SUBAGENT_MAX_TURNS"; + /// Configuration for task execution with all necessary dependencies #[derive(Clone)] pub struct TaskConfig { pub id: String, pub provider: Option>, - pub mcp_tx: mpsc::Sender, pub max_turns: Option, } @@ -26,12 +30,16 @@ impl fmt::Debug for TaskConfig { impl TaskConfig { /// Create a new TaskConfig with all required dependencies - pub fn new(provider: Option>, mcp_tx: mpsc::Sender) -> Self { + pub fn new(provider: Option>) -> Self { Self { id: Uuid::new_v4().to_string(), provider, - mcp_tx, - max_turns: Some(10), + max_turns: Some( + env::var(GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR) + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_SUBAGENT_MAX_TURNS), + ), } } @@ -39,9 +47,4 @@ impl TaskConfig { pub fn provider(&self) -> Option<&Arc> { self.provider.as_ref() } - - /// Get a clone of the MCP sender - pub fn mcp_tx(&self) -> mpsc::Sender { - self.mcp_tx.clone() - } }