Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 1 addition & 39 deletions crates/goose/src/agents/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ pub struct Agent {
pub(super) tool_monitor: Arc<Mutex<Option<ToolMonitor>>>,
pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>,
pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>,
pub(super) mcp_tx: Mutex<mpsc::Sender<JsonRpcMessage>>,
pub(super) mcp_notification_rx: Arc<Mutex<mpsc::Receiver<JsonRpcMessage>>>,
pub(super) retry_manager: RetryManager,
}

Expand Down Expand Up @@ -132,8 +130,6 @@ impl Agent {
// Create channels with buffer size 32 (adjust if needed)
let (confirm_tx, confirm_rx) = mpsc::channel(32);
let (tool_tx, tool_rx) = mpsc::channel(32);
// Add MCP notification channel
let (mcp_tx, mcp_rx) = mpsc::channel(100);

let tool_monitor = Arc::new(Mutex::new(None));
let retry_manager = RetryManager::with_tool_monitor(tool_monitor.clone());
Expand All @@ -154,9 +150,6 @@ impl Agent {
tool_monitor,
router_tool_selector: Mutex::new(None),
scheduler_service: Mutex::new(None),
// Initialize with MCP notification support
mcp_tx: Mutex::new(mcp_tx),
mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)),
retry_manager,
}
}
Expand Down Expand Up @@ -342,9 +335,8 @@ impl Agent {
.await
} else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME {
let provider = self.provider().await.ok();
let mcp_tx = self.mcp_tx.lock().await.clone();

let task_config = TaskConfig::new(provider, mcp_tx);
let task_config = TaskConfig::new(provider);
subagent_execute_task_tool::run_tasks(
tool_call.arguments.clone(),
task_config,
Expand Down Expand Up @@ -771,24 +763,6 @@ impl Agent {
break;
}

// Handle MCP notifications from subagents
let mcp_notifications = self.get_mcp_notifications().await;
for notification in mcp_notifications {
if let JsonRpcMessage::Notification(notif) = &notification {
if let Some(data) = notif.notification.params.get("data") {
if let (Some(subagent_id), Some(_message)) = (
data.get("subagent_id").and_then(|v| v.as_str()),
data.get("message").and_then(|v| v.as_str()),
) {
yield AgentEvent::McpNotification((
subagent_id.to_string(),
notification.clone(),
));
}
}
}
}

let mut stream = Self::stream_response_from_provider(
self.provider().await?,
&system_prompt,
Expand Down Expand Up @@ -1085,18 +1059,6 @@ impl Agent {
prompt_manager.add_system_prompt_extra(instruction);
}

/// Get MCP notifications from subagents
pub async fn get_mcp_notifications(&self) -> Vec<JsonRpcMessage> {
let mut notifications = Vec::new();
let mut rx = self.mcp_notification_rx.lock().await;

while let Ok(notification) = rx.try_recv() {
notifications.push(notification);
}

notifications
}

pub async fn update_provider(&self, provider: Arc<dyn Provider>) -> Result<()> {
let mut current_provider = self.provider.lock().await;
*current_provider = Some(provider.clone());
Expand Down
131 changes: 10 additions & 121 deletions crates/goose/src/agents/subagent.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::agents::subagent_task_config::DEFAULT_SUBAGENT_MAX_TURNS;
use crate::{
agents::extension::ExtensionConfig,
agents::{extension_manager::ExtensionManager, Agent, TaskConfig},
Expand All @@ -9,8 +10,6 @@ use crate::{
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use mcp_core::{handler::ToolError, tool::Tool};
use rmcp::model::{JsonRpcMessage, JsonRpcNotification, JsonRpcVersion2_0, Notification};
use rmcp::object;
use serde::{Deserialize, Serialize};
// use serde_json::{self};
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -53,7 +52,7 @@ impl SubAgent {
#[instrument(skip(task_config))]
pub async fn new(
task_config: TaskConfig,
) -> Result<(Arc<Self>, tokio::task::JoinHandle<()>), anyhow::Error> {
) -> Result<Arc<Self>, anyhow::Error> {
debug!("Creating new subagent with id: {}", task_config.id);

// Create a new extension manager for this subagent
Expand Down Expand Up @@ -89,21 +88,8 @@ impl SubAgent {
extension_manager: Arc::new(RwLock::new(extension_manager)),
});

// Send initial MCP notification
let subagent_clone = Arc::clone(&subagent);
subagent_clone
.send_mcp_notification("subagent_created", "Subagent created and ready")
.await;

// Create a background task handle (for future use with streaming/monitoring)
let subagent_clone = Arc::clone(&subagent);
let handle = tokio::spawn(async move {
// This could be used for background monitoring, cleanup, etc.
debug!("Subagent {} background task started", subagent_clone.id);
});

debug!("Subagent {} created successfully", subagent.id);
Ok((subagent, handle))
Ok(subagent)
}

/// Get the current status of the subagent
Expand All @@ -118,51 +104,6 @@ impl SubAgent {
let mut current_status = self.status.write().await;
*current_status = status.clone();
} // Write lock is released here!

// Send MCP notifications based on status
match &status {
SubAgentStatus::Processing => {
self.send_mcp_notification("status_changed", "Processing request")
.await;
}
SubAgentStatus::Completed(msg) => {
self.send_mcp_notification("completed", &format!("Completed: {}", msg))
.await;
}
SubAgentStatus::Terminated => {
self.send_mcp_notification("terminated", "Subagent terminated")
.await;
}
_ => {}
}
}

/// Send an MCP notification about the subagent's activity
pub async fn send_mcp_notification(&self, notification_type: &str, message: &str) {
let notification = JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: JsonRpcVersion2_0,
notification: Notification {
method: "notifications/message".to_string(),
params: object!({
"level": "info",
"logger": format!("subagent_{}", self.id),
"data": {
"subagent_id": self.id,
"type": notification_type,
"message": message,
"timestamp": Utc::now().to_rfc3339()
}
}),
extensions: Default::default(),
},
});

if let Err(e) = self.config.mcp_tx.send(notification).await {
error!(
"Failed to send MCP notification from subagent {}: {}",
self.id, e
);
}
}

/// Get current progress information
Expand Down Expand Up @@ -193,8 +134,6 @@ impl SubAgent {
task_config: TaskConfig,
) -> Result<Message, anyhow::Error> {
debug!("Processing message for subagent {}", self.id);
self.send_mcp_notification("message_processing", &format!("Processing: {}", message))
.await;

// Get provider from task config
let provider = self
Expand All @@ -203,20 +142,6 @@ impl SubAgent {
.as_ref()
.ok_or_else(|| anyhow!("No provider configured for subagent"))?;

// Check if we've exceeded max turns
{
let turn_count = *self.turn_count.lock().await;
if let Some(max_turns) = self.config.max_turns {
if turn_count >= max_turns {
self.set_status(SubAgentStatus::Completed(
"Maximum turns exceeded".to_string(),
))
.await;
return Err(anyhow!("Maximum turns ({}) exceeded", max_turns));
}
}
}

// Set status to processing
self.set_status(SubAgentStatus::Processing).await;

Expand All @@ -227,17 +152,6 @@ impl SubAgent {
conversation.push(user_message.clone());
}

// Increment turn count
{
let mut turn_count = self.turn_count.lock().await;
*turn_count += 1;
self.send_mcp_notification(
"turn_progress",
&format!("Turn {}/{}", turn_count, self.config.max_turns.unwrap_or(0)),
)
.await;
}

// Get the current conversation for context
let mut messages = self.get_conversation().await;

Expand All @@ -255,8 +169,14 @@ impl SubAgent {
// Build system prompt using the template
let system_prompt = self.build_system_prompt(&tools).await?;

// Generate response from provider with loop for tool processing (max_turns iterations)
let mut loop_count = 0;
let max_turns = self.config.max_turns.unwrap_or(DEFAULT_SUBAGENT_MAX_TURNS);

// Generate response from provider
loop {
loop_count += 1;

match Agent::generate_response_from_provider(
Arc::clone(provider),
&system_prompt,
Expand All @@ -281,19 +201,9 @@ impl SubAgent {
.collect();

// If there are no tool requests, we're done
if tool_requests.is_empty() {
if tool_requests.is_empty() || loop_count >= max_turns {
self.add_message(response.clone()).await;

// Send notification about response
self.send_mcp_notification(
"response_generated",
&format!("Responded: {}", response.as_concat_text()),
)
.await;

// Add delay before completion to ensure all processing finishes
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

// Set status back to ready and return the final response
self.set_status(SubAgentStatus::Completed("Completed!".to_string()))
.await;
Expand All @@ -306,13 +216,6 @@ impl SubAgent {
// Process each tool request and create user response messages
for request in &tool_requests {
if let Ok(tool_call) = &request.tool_call {
// Send notification about tool usage
self.send_mcp_notification(
"tool_usage",
&format!("Using tool: {}", tool_call.name),
)
.await;

// Handle platform tools or dispatch to extension manager
let tool_result = match self
.extension_manager
Expand All @@ -331,13 +234,6 @@ impl SubAgent {
let tool_response_message = Message::user()
.with_tool_response(request.id.clone(), Ok(result.clone()));
messages.push(tool_response_message);

// Send notification about tool completion
self.send_mcp_notification(
"tool_completed",
&format!("Tool {} completed successfully", tool_call.name),
)
.await;
}
Err(e) => {
// Create a user message with the tool error
Expand All @@ -346,13 +242,6 @@ impl SubAgent {
Err(ToolError::ExecutionError(e.to_string())),
);
messages.push(tool_error_message);

// Send notification about tool error
self.send_mcp_notification(
"tool_error",
&format!("Tool {} error: {}", tool_call.name, e),
)
.await;
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions crates/goose/src/agents/subagent_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@ pub async fn run_complete_subagent_task(
task_config: TaskConfig,
) -> Result<String, anyhow::Error> {
// Create the subagent with the parent agent's provider
let (subagent, handle) = SubAgent::new(task_config.clone()).await?;
let subagent = SubAgent::new(task_config.clone())
.await
.map_err(|e| ToolError::ExecutionError(format!("Failed to create subagent: {}", e)))?;

// Execute the subagent task
let result = subagent
.reply_subagent(text_instruction, task_config)
.await?;
let response_text = result.as_concat_text();

// Clean up the subagent handle
if let Err(e) = handle.await {
tracing::debug!("Subagent handle cleanup error: {}", e);
}

// Return the result
Ok(response_text)
}
25 changes: 14 additions & 11 deletions crates/goose/src/agents/subagent_task_config.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use crate::providers::base::Provider;
use rmcp::model::JsonRpcMessage;
use std::env;
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
use uuid::Uuid;

/// Default maximum number of turns for task execution
pub const DEFAULT_SUBAGENT_MAX_TURNS: usize = 5;

/// Environment variable name for configuring max turns
pub const GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR: &str = "GOOSE_SUBAGENT_MAX_TURNS";

/// Configuration for task execution with all necessary dependencies
#[derive(Clone)]
pub struct TaskConfig {
pub id: String,
pub provider: Option<Arc<dyn Provider>>,
pub mcp_tx: mpsc::Sender<JsonRpcMessage>,
pub max_turns: Option<usize>,
}

Expand All @@ -26,22 +30,21 @@ impl fmt::Debug for TaskConfig {

impl TaskConfig {
/// Create a new TaskConfig with all required dependencies
pub fn new(provider: Option<Arc<dyn Provider>>, mcp_tx: mpsc::Sender<JsonRpcMessage>) -> Self {
pub fn new(provider: Option<Arc<dyn Provider>>) -> Self {
Self {
id: Uuid::new_v4().to_string(),
provider,
mcp_tx,
max_turns: Some(10),
max_turns: Some(
env::var(GOOSE_SUBAGENT_MAX_TURNS_ENV_VAR)
.ok()
.and_then(|val| val.parse::<usize>().ok())
.unwrap_or(DEFAULT_SUBAGENT_MAX_TURNS),
),
}
}

/// Get a reference to the provider
pub fn provider(&self) -> Option<&Arc<dyn Provider>> {
self.provider.as_ref()
}

/// Get a clone of the MCP sender
pub fn mcp_tx(&self) -> mpsc::Sender<JsonRpcMessage> {
self.mcp_tx.clone()
}
}