diff --git a/crates/goose/src/agents/agent.rs b/crates/goose/src/agents/agent.rs index ba1b0fc1b741..088fba588a71 100644 --- a/crates/goose/src/agents/agent.rs +++ b/crates/goose/src/agents/agent.rs @@ -927,352 +927,352 @@ impl Agent { }); Ok(Box::pin(async_stream::try_stream! { - let _ = reply_span.enter(); - let mut turns_taken = 0u32; - let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS); + let _ = reply_span.enter(); + let mut turns_taken = 0u32; + let max_turns = session_config.max_turns.unwrap_or(DEFAULT_MAX_TURNS); - loop { - if is_token_cancelled(&cancel_token) { - break; - } + loop { + if is_token_cancelled(&cancel_token) { + break; + } - if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { - if final_output_tool.final_output.is_some() { - let final_event = AgentEvent::Message( - Message::assistant().with_text(final_output_tool.final_output.clone().unwrap()) - ); - yield final_event; - break; - } - } + if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { + if final_output_tool.final_output.is_some() { + let final_event = AgentEvent::Message( + Message::assistant().with_text(final_output_tool.final_output.clone().unwrap()) + ); + yield final_event; + break; + } + } - turns_taken += 1; - if turns_taken > max_turns { - yield AgentEvent::Message( - Message::assistant().with_text( - "I've reached the maximum number of actions I can do without user input. Would you like me to continue?" - ) - ); - break; - } + turns_taken += 1; + if turns_taken > max_turns { + yield AgentEvent::Message( + Message::assistant().with_text( + "I've reached the maximum number of actions I can do without user input. Would you like me to continue?" + ) + ); + break; + } - let conversation_with_moim = super::moim::inject_moim( - conversation.clone(), - &self.extension_manager, - ).await; - - let mut stream = Self::stream_response_from_provider( - self.provider().await?, - &system_prompt, - conversation_with_moim.messages(), - &tools, - &toolshim_tools, - ).await?; - - let mut no_tools_called = true; - let mut messages_to_add = Conversation::default(); - let mut tools_updated = false; - let mut did_recovery_compact_this_iteration = false; - - while let Some(next) = stream.next().await { - if is_token_cancelled(&cancel_token) { - break; - } + let conversation_with_moim = super::moim::inject_moim( + conversation.clone(), + &self.extension_manager, + ).await; + + let mut stream = Self::stream_response_from_provider( + self.provider().await?, + &system_prompt, + conversation_with_moim.messages(), + &tools, + &toolshim_tools, + ).await?; - match next { - Ok((response, usage)) => { - // Emit model change event if provider is lead-worker - let provider = self.provider().await?; - if let Some(lead_worker) = provider.as_lead_worker() { - if let Some(ref usage) = usage { - let active_model = usage.model.clone(); - let (lead_model, worker_model) = lead_worker.get_model_info(); - let mode = if active_model == lead_model { - "lead" - } else if active_model == worker_model { - "worker" - } else { - "unknown" - }; + let mut no_tools_called = true; + let mut messages_to_add = Conversation::default(); + let mut tools_updated = false; + let mut did_recovery_compact_this_iteration = false; - yield AgentEvent::ModelChange { - model: active_model, - mode: mode.to_string(), - }; - } - } + while let Some(next) = stream.next().await { + if is_token_cancelled(&cancel_token) { + break; + } - if let Some(ref usage) = usage { - Self::update_session_metrics(&session_config, usage, false).await?; - } + match next { + Ok((response, usage)) => { + // Emit model change event if provider is lead-worker + let provider = self.provider().await?; + if let Some(lead_worker) = provider.as_lead_worker() { + if let Some(ref usage) = usage { + let active_model = usage.model.clone(); + let (lead_model, worker_model) = lead_worker.get_model_info(); + let mode = if active_model == lead_model { + "lead" + } else if active_model == worker_model { + "worker" + } else { + "unknown" + }; + + yield AgentEvent::ModelChange { + model: active_model, + mode: mode.to_string(), + }; + } + } - if let Some(response) = response { - let ToolCategorizeResult { - frontend_requests, - remaining_requests, - filtered_response, - } = self.categorize_tools(&response, &tools).await; - let requests_to_record: Vec = frontend_requests.iter().chain(remaining_requests.iter()).cloned().collect(); - self.tool_route_manager - .record_tool_requests(&requests_to_record) - .await; + if let Some(ref usage) = usage { + Self::update_session_metrics(&session_config, usage, false).await?; + } - yield AgentEvent::Message(filtered_response.clone()); - tokio::task::yield_now().await; + if let Some(response) = response { + let ToolCategorizeResult { + frontend_requests, + remaining_requests, + filtered_response, + } = self.categorize_tools(&response, &tools).await; + let requests_to_record: Vec = frontend_requests.iter().chain(remaining_requests.iter()).cloned().collect(); + self.tool_route_manager + .record_tool_requests(&requests_to_record) + .await; + + yield AgentEvent::Message(filtered_response.clone()); + tokio::task::yield_now().await; + + let num_tool_requests = frontend_requests.len() + remaining_requests.len(); + if num_tool_requests == 0 { + messages_to_add.push(response.clone()); + continue; + } - let num_tool_requests = frontend_requests.len() + remaining_requests.len(); - if num_tool_requests == 0 { - messages_to_add.push(response.clone()); - continue; - } + let tool_response_messages: Vec>> = (0..num_tool_requests) + .map(|_| Arc::new(Mutex::new(Message::user().with_id( + format!("msg_{}", Uuid::new_v4()) + )))) + .collect(); - let tool_response_messages: Vec>> = (0..num_tool_requests) - .map(|_| Arc::new(Mutex::new(Message::user().with_id( - format!("msg_{}", Uuid::new_v4()) - )))) - .collect(); + let mut request_to_response_map = HashMap::new(); + for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() { + request_to_response_map.insert(request.id.clone(), tool_response_messages[idx].clone()); + } - let mut request_to_response_map = HashMap::new(); - for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() { - request_to_response_map.insert(request.id.clone(), tool_response_messages[idx].clone()); - } + for (idx, request) in frontend_requests.iter().enumerate() { + let mut frontend_tool_stream = self.handle_frontend_tool_request( + request, + tool_response_messages[idx].clone(), + ); - for (idx, request) in frontend_requests.iter().enumerate() { - let mut frontend_tool_stream = self.handle_frontend_tool_request( - request, - tool_response_messages[idx].clone(), - ); + while let Some(msg) = frontend_tool_stream.try_next().await? { + yield AgentEvent::Message(msg); + } + } + if goose_mode == GooseMode::Chat { + // Skip all remaining tool calls in chat mode + for request in remaining_requests.iter() { + if let Some(response_msg) = request_to_response_map.get(&request.id) { + let mut response = response_msg.lock().await; + *response = response.clone().with_tool_response( + request.id.clone(), + Ok(vec![Content::text(CHAT_MODE_TOOL_SKIPPED_RESPONSE)]), + ); + } + } + } else { + // Run all tool inspectors + let inspection_results = self.tool_inspection_manager + .inspect_tools( + &remaining_requests, + conversation.messages(), + ) + .await?; - while let Some(msg) = frontend_tool_stream.try_next().await? { - yield AgentEvent::Message(msg); - } - } - if goose_mode == GooseMode::Chat { - // Skip all remaining tool calls in chat mode - for request in remaining_requests.iter() { - if let Some(response_msg) = request_to_response_map.get(&request.id) { - let mut response = response_msg.lock().await; - *response = response.clone().with_tool_response( - request.id.clone(), - Ok(vec![Content::text(CHAT_MODE_TOOL_SKIPPED_RESPONSE)]), - ); - } - } - } else { - // Run all tool inspectors - let inspection_results = self.tool_inspection_manager - .inspect_tools( - &remaining_requests, - conversation.messages(), - ) - .await?; + let permission_check_result = self.tool_inspection_manager + .process_inspection_results_with_permission_inspector( + &remaining_requests, + &inspection_results, + ) + .unwrap_or_else(|| { + let mut result = PermissionCheckResult { + approved: vec![], + needs_approval: vec![], + denied: vec![], + }; + result.needs_approval.extend(remaining_requests.iter().cloned()); + result + }); + + // Track extension requests + let mut enable_extension_request_ids = vec![]; + for request in &remaining_requests { + if let Ok(tool_call) = &request.tool_call { + if tool_call.name == MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE { + enable_extension_request_ids.push(request.id.clone()); + } + } + } - let permission_check_result = self.tool_inspection_manager - .process_inspection_results_with_permission_inspector( - &remaining_requests, - &inspection_results, - ) - .unwrap_or_else(|| { - let mut result = PermissionCheckResult { - approved: vec![], - needs_approval: vec![], - denied: vec![], - }; - result.needs_approval.extend(remaining_requests.iter().cloned()); - result - }); - - // Track extension requests - let mut enable_extension_request_ids = vec![]; - for request in &remaining_requests { - if let Ok(tool_call) = &request.tool_call { - if tool_call.name == MANAGE_EXTENSIONS_TOOL_NAME_COMPLETE { - enable_extension_request_ids.push(request.id.clone()); - } - } - } + let mut tool_futures = self.handle_approved_and_denied_tools( + &permission_check_result, + &request_to_response_map, + cancel_token.clone(), + &session, + ).await?; + + let tool_futures_arc = Arc::new(Mutex::new(tool_futures)); + + let mut tool_approval_stream = self.handle_approval_tool_requests( + &permission_check_result.needs_approval, + tool_futures_arc.clone(), + &request_to_response_map, + cancel_token.clone(), + &session, + &inspection_results, + ); - let mut tool_futures = self.handle_approved_and_denied_tools( - &permission_check_result, - &request_to_response_map, - cancel_token.clone(), - &session, - ).await?; + while let Some(msg) = tool_approval_stream.try_next().await? { + yield AgentEvent::Message(msg); + } - let tool_futures_arc = Arc::new(Mutex::new(tool_futures)); + tool_futures = { + let mut futures_lock = tool_futures_arc.lock().await; + futures_lock.drain(..).collect::>() + }; - let mut tool_approval_stream = self.handle_approval_tool_requests( - &permission_check_result.needs_approval, - tool_futures_arc.clone(), - &request_to_response_map, - cancel_token.clone(), - &session, - &inspection_results, - ); + let with_id = tool_futures + .into_iter() + .map(|(request_id, stream)| { + stream.map(move |item| (request_id.clone(), item)) + }) + .collect::>(); - while let Some(msg) = tool_approval_stream.try_next().await? { - yield AgentEvent::Message(msg); - } + let mut combined = stream::select_all(with_id); + let mut all_install_successful = true; - tool_futures = { - let mut futures_lock = tool_futures_arc.lock().await; - futures_lock.drain(..).collect::>() - }; + while let Some((request_id, item)) = combined.next().await { + if is_token_cancelled(&cancel_token) { + break; + } + match item { + ToolStreamItem::Result(output) => { + if enable_extension_request_ids.contains(&request_id) + && output.is_err() + { + all_install_successful = false; + } + if let Some(response_msg) = request_to_response_map.get(&request_id) { + let mut response = response_msg.lock().await; + *response = response.clone().with_tool_response(request_id, output); + } + } + ToolStreamItem::Message(msg) => { + yield AgentEvent::McpNotification((request_id, msg)); + } + } + } - let with_id = tool_futures - .into_iter() - .map(|(request_id, stream)| { - stream.map(move |item| (request_id.clone(), item)) - }) - .collect::>(); + if all_install_successful && !enable_extension_request_ids.is_empty() { + if let Err(e) = self.save_extension_state(&session_config).await { + warn!("Failed to save extension state after runtime changes: {}", e); + } + tools_updated = true; + } + } - let mut combined = stream::select_all(with_id); - let mut all_install_successful = true; + for (idx, request) in frontend_requests.iter().chain(remaining_requests.iter()).enumerate() { + if request.tool_call.is_ok() { + let request_msg = Message::assistant() + .with_id(format!("msg_{}", Uuid::new_v4())) + .with_tool_request(request.id.clone(), request.tool_call.clone()); + messages_to_add.push(request_msg); + let final_response = tool_response_messages[idx] + .lock().await.clone(); + yield AgentEvent::Message(final_response.clone()); + messages_to_add.push(final_response); + } + } - while let Some((request_id, item)) = combined.next().await { - if is_token_cancelled(&cancel_token) { - break; - } - match item { - ToolStreamItem::Result(output) => { - if enable_extension_request_ids.contains(&request_id) - && output.is_err() - { - all_install_successful = false; - } - if let Some(response_msg) = request_to_response_map.get(&request_id) { - let mut response = response_msg.lock().await; - *response = response.clone().with_tool_response(request_id, output); + no_tools_called = false; } } - ToolStreamItem::Message(msg) => { - yield AgentEvent::McpNotification((request_id, msg)); - } - } - } - - if all_install_successful && !enable_extension_request_ids.is_empty() { - if let Err(e) = self.save_extension_state(&session_config).await { - warn!("Failed to save extension state after runtime changes: {}", e); - } - tools_updated = true; - } - } - - for (idx, request) in frontend_requests.iter() - .chain(remaining_requests.iter()).enumerate() { - if request.tool_call.is_ok() { - let request_msg = Message::assistant() - .with_id(format!("msg_{}", Uuid::new_v4())) - .with_tool_request(request.id.clone(), request.tool_call.clone()); - messages_to_add.push(request_msg); - let final_response = tool_response_messages[idx] - .lock().await.clone(); - yield AgentEvent::Message(final_response.clone()); - messages_to_add.push(final_response); - } - } - no_tools_called = false; - } - } - Err(ProviderError::ContextLengthExceeded(_error_msg)) => { - yield AgentEvent::Message( - Message::assistant().with_system_notification( - SystemNotificationType::InlineMessage, - "Context limit reached. Compacting to continue conversation...", - ) - ); - yield AgentEvent::Message( - Message::assistant().with_system_notification( - SystemNotificationType::ThinkingMessage, - COMPACTION_THINKING_TEXT, - ) - ); + Err(ProviderError::ContextLengthExceeded(_error_msg)) => { + yield AgentEvent::Message( + Message::assistant().with_system_notification( + SystemNotificationType::InlineMessage, + "Context limit reached. Compacting to continue conversation...", + ) + ); + yield AgentEvent::Message( + Message::assistant().with_system_notification( + SystemNotificationType::ThinkingMessage, + COMPACTION_THINKING_TEXT, + ) + ); - match compact_messages(self.provider().await?.as_ref(), &conversation, false).await { - Ok((compacted_conversation, usage)) => { - SessionManager::replace_conversation(&session_config.id, &compacted_conversation).await?; - Self::update_session_metrics(&session_config, &usage, true).await?; - conversation = compacted_conversation; - did_recovery_compact_this_iteration = true; - yield AgentEvent::HistoryReplaced(conversation.clone()); - continue; - } - Err(e) => { - error!("Error: {}", e); - yield AgentEvent::Message( - Message::assistant().with_text( - format!("Ran into this error trying to compact: {e}.\n\nPlease retry if you think this is a transient or recoverable error.") - ) - ); - break; - } - } + match compact_messages(self.provider().await?.as_ref(), &conversation, false).await { + Ok((compacted_conversation, usage)) => { + SessionManager::replace_conversation(&session_config.id, &compacted_conversation).await?; + Self::update_session_metrics(&session_config, &usage, true).await?; + conversation = compacted_conversation; + did_recovery_compact_this_iteration = true; + yield AgentEvent::HistoryReplaced(conversation.clone()); + continue; } Err(e) => { error!("Error: {}", e); yield AgentEvent::Message( Message::assistant().with_text( - format!("Ran into this error: {e}.\n\nPlease retry if you think this is a transient or recoverable error.") + format!("Ran into this error trying to compact: {e}.\n\nPlease retry if you think this is a transient or recoverable error.") ) ); break; } } } - if tools_updated { - (tools, toolshim_tools, system_prompt) = - self.prepare_tools_and_prompt(&working_dir).await?; + Err(e) => { + error!("Error: {}", e); + yield AgentEvent::Message( + Message::assistant().with_text( + format!("Ran into this error: {e}.\n\nPlease retry if you think this is a transient or recoverable error.") + ) + ); + break; + } + } + } + if tools_updated { + (tools, toolshim_tools, system_prompt) = + self.prepare_tools_and_prompt(&working_dir).await?; + } + let mut exit_chat = false; + if no_tools_called { + if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { + if final_output_tool.final_output.is_none() { + warn!("Final output tool has not been called yet. Continuing agent loop."); + let message = Message::user().with_text(FINAL_OUTPUT_CONTINUATION_MESSAGE); + messages_to_add.push(message.clone()); + yield AgentEvent::Message(message); + } else { + let message = Message::assistant().with_text(final_output_tool.final_output.clone().unwrap()); + messages_to_add.push(message.clone()); + yield AgentEvent::Message(message); + exit_chat = true; } - let mut exit_chat = false; - if no_tools_called { - if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { - if final_output_tool.final_output.is_none() { - warn!("Final output tool has not been called yet. Continuing agent loop."); - let message = Message::user().with_text(FINAL_OUTPUT_CONTINUATION_MESSAGE); - messages_to_add.push(message.clone()); - yield AgentEvent::Message(message); + } else if did_recovery_compact_this_iteration { + // Avoid setting exit_chat; continue from last user message in the conversation + } else { + match self.handle_retry_logic(&mut conversation, &session_config, &initial_messages).await { + Ok(should_retry) => { + if should_retry { + info!("Retry logic triggered, restarting agent loop"); } else { - let message = Message::assistant().with_text(final_output_tool.final_output.clone().unwrap()); - messages_to_add.push(message.clone()); - yield AgentEvent::Message(message); exit_chat = true; } - } else if did_recovery_compact_this_iteration { - // Avoid setting exit_chat; continue from last user message in the conversation - } else { - match self.handle_retry_logic(&mut conversation, &session_config, &initial_messages).await { - Ok(should_retry) => { - if should_retry { - info!("Retry logic triggered, restarting agent loop"); - } else { - exit_chat = true; - } - } - Err(e) => { - error!("Retry logic failed: {}", e); - yield AgentEvent::Message( - Message::assistant().with_text( - format!("Retry logic encountered an error: {}", e) - ) - ); - exit_chat = true; - } - } + } + Err(e) => { + error!("Retry logic failed: {}", e); + yield AgentEvent::Message( + Message::assistant().with_text( + format!("Retry logic encountered an error: {}", e) + ) + ); + exit_chat = true; } } + } + } - for msg in &messages_to_add { - SessionManager::add_message(&session_config.id, msg).await?; - } - conversation.extend(messages_to_add); - if exit_chat { - break; - } + for msg in &messages_to_add { + SessionManager::add_message(&session_config.id, msg).await?; + } + conversation.extend(messages_to_add); + if exit_chat { + break; + } - tokio::task::yield_now().await; - } - })) + tokio::task::yield_now().await; + } + })) } pub async fn extend_system_prompt(&self, instruction: String) {