-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat: consolidate subagent execution for dynamic tasks #3444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
75d3cd1
cb0e6a5
476651f
b50d245
9ca6b25
7446692
b848409
a02c056
f7832da
e601ca2
4d11f37
d8fdedd
f1c38f9
54d5224
47e769e
85d5286
2ccfb68
4453998
c984764
c14da81
a01d712
41c46a6
2c0f24c
41fc7ca
353c52a
17a5213
4f52ab1
8f4f55f
6c6113d
2c0d73f
0e01630
5007699
1f010d1
dc0ec41
0c513b4
f65ba7b
40546b8
545ea23
ae0933e
8d7e402
564369a
b28ce9c
7c492e3
11f33ff
e4d822a
aa25ef4
161c809
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,11 +9,14 @@ use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; | |
| use mcp_core::protocol::JsonRpcMessage; | ||
|
|
||
| use crate::agents::final_output_tool::{FINAL_OUTPUT_CONTINUATION_MESSAGE, FINAL_OUTPUT_TOOL_NAME}; | ||
| use crate::agents::sub_recipe_execution_tool::sub_recipe_execute_task_tool::{ | ||
| self, SUB_RECIPE_EXECUTE_TASK_TOOL_NAME, | ||
| use crate::agents::recipe_tools::dynamic_task_tools::{ | ||
| create_dynamic_task, create_dynamic_task_tool, DYNAMIC_TASK_TOOL_NAME_PREFIX, | ||
| }; | ||
| use crate::agents::sub_recipe_execution_tool::tasks_manager::TasksManager; | ||
| use crate::agents::sub_recipe_manager::SubRecipeManager; | ||
| use crate::agents::subagent_execution_tool::subagent_execute_task_tool::{ | ||
| self, SUBAGENT_EXECUTE_TASK_TOOL_NAME, | ||
| }; | ||
| use crate::agents::subagent_execution_tool::tasks_manager::TasksManager; | ||
| use crate::config::{Config, ExtensionConfigManager, PermissionManager}; | ||
| use crate::message::{push_message, Message}; | ||
| use crate::permission::permission_judge::check_tool_permissions; | ||
|
|
@@ -48,21 +51,18 @@ use mcp_core::{ | |
| prompt::Prompt, protocol::GetPromptResult, tool::Tool, Content, ToolError, ToolResult, | ||
| }; | ||
|
|
||
| use crate::agents::subagent_tools::SUBAGENT_RUN_TASK_TOOL_NAME; | ||
|
|
||
| use super::final_output_tool::FinalOutputTool; | ||
| use super::platform_tools; | ||
| use super::router_tools; | ||
| use super::subagent_manager::SubAgentManager; | ||
| use super::subagent_tools; | ||
| use super::tool_execution::{ToolCallResult, CHAT_MODE_TOOL_SKIPPED_RESPONSE, DECLINED_RESPONSE}; | ||
| use crate::agents::subagent_task_config::TaskConfig; | ||
|
|
||
| const DEFAULT_MAX_TURNS: u32 = 1000; | ||
|
|
||
| /// The main goose Agent | ||
| pub struct Agent { | ||
| pub(super) provider: Mutex<Option<Arc<dyn Provider>>>, | ||
| pub(super) extension_manager: RwLock<ExtensionManager>, | ||
| pub(super) extension_manager: Arc<RwLock<ExtensionManager>>, | ||
| pub(super) sub_recipe_manager: Mutex<SubRecipeManager>, | ||
| pub(super) tasks_manager: TasksManager, | ||
| pub(super) final_output_tool: Mutex<Option<FinalOutputTool>>, | ||
|
|
@@ -76,7 +76,7 @@ pub struct Agent { | |
| pub(super) tool_monitor: Mutex<Option<ToolMonitor>>, | ||
| pub(super) router_tool_selector: Mutex<Option<Arc<Box<dyn RouterToolSelector>>>>, | ||
| pub(super) scheduler_service: Mutex<Option<Arc<dyn SchedulerTrait>>>, | ||
| pub(super) subagent_manager: Mutex<Option<SubAgentManager>>, | ||
| pub(super) mcp_tx: Mutex<mpsc::Sender<JsonRpcMessage>>, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, this will be eventually passed to the subagent via the I feel we can reuse the channel here https://github.com/block/goose/blob/wtang/execute_dynamic_tasks/crates/goose/src/agents/subagent_execution_tool/subagent_execute_task_tool.rs#L67 instead
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can address the channel updates as a follow up! |
||
| pub(super) mcp_notification_rx: Arc<Mutex<mpsc::Receiver<JsonRpcMessage>>>, | ||
| } | ||
|
|
||
|
|
@@ -137,7 +137,7 @@ impl Agent { | |
|
|
||
| Self { | ||
| provider: Mutex::new(None), | ||
| extension_manager: RwLock::new(ExtensionManager::new()), | ||
| extension_manager: Arc::new(RwLock::new(ExtensionManager::new())), | ||
| sub_recipe_manager: Mutex::new(SubRecipeManager::new()), | ||
| tasks_manager: TasksManager::new(), | ||
| final_output_tool: Mutex::new(None), | ||
|
|
@@ -152,7 +152,7 @@ impl Agent { | |
| router_tool_selector: Mutex::new(None), | ||
| scheduler_service: Mutex::new(None), | ||
| // Initialize with MCP notification support | ||
| subagent_manager: Mutex::new(Some(SubAgentManager::new(mcp_tx))), | ||
| mcp_tx: Mutex::new(mcp_tx), | ||
| mcp_notification_rx: Arc::new(Mutex::new(mcp_rx)), | ||
| } | ||
| } | ||
|
|
@@ -300,12 +300,20 @@ impl Agent { | |
| &self.tasks_manager, | ||
| ) | ||
| .await | ||
| } else if tool_call.name == SUB_RECIPE_EXECUTE_TASK_TOOL_NAME { | ||
| sub_recipe_execute_task_tool::run_tasks( | ||
| } else if tool_call.name == SUBAGENT_EXECUTE_TASK_TOOL_NAME { | ||
| let provider = self.provider().await.ok(); | ||
| let mcp_tx = self.mcp_tx.lock().await.clone(); | ||
|
|
||
| let task_config = | ||
| TaskConfig::new(provider, Some(Arc::clone(&self.extension_manager)), mcp_tx); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No turns out we need the extension manager in order to dispatch mcp tool calls
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree, we can address this afterwards.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Just curious how sub-recipe pass the extensions to the subAgent.rs then? I remember we were planning to have |
||
| subagent_execute_task_tool::run_tasks( | ||
| tool_call.arguments.clone(), | ||
| task_config, | ||
| &self.tasks_manager, | ||
| ) | ||
| .await | ||
| } else if tool_call.name == DYNAMIC_TASK_TOOL_NAME_PREFIX { | ||
| create_dynamic_task(tool_call.arguments.clone(), &self.tasks_manager).await | ||
| } else if tool_call.name == PLATFORM_READ_RESOURCE_TOOL_NAME { | ||
| // Check if the tool is read_resource and handle it separately | ||
| ToolCallResult::from( | ||
|
|
@@ -321,11 +329,6 @@ impl Agent { | |
| ) | ||
| } else if tool_call.name == PLATFORM_SEARCH_AVAILABLE_EXTENSIONS_TOOL_NAME { | ||
| ToolCallResult::from(extension_manager.search_available_extensions().await) | ||
| } else if tool_call.name == SUBAGENT_RUN_TASK_TOOL_NAME { | ||
| ToolCallResult::from( | ||
| self.handle_run_subagent_task(tool_call.arguments.clone()) | ||
| .await, | ||
| ) | ||
| } else if self.is_frontend_tool(&tool_call.name).await { | ||
| // For frontend tools, return an error indicating we need frontend execution | ||
| ToolCallResult::from(Err(ToolError::ExecutionError( | ||
|
|
@@ -567,11 +570,8 @@ impl Agent { | |
| platform_tools::manage_schedule_tool(), | ||
| ]); | ||
|
|
||
| // Add subagent tool (only if ALPHA_FEATURES is enabled) | ||
| let config = Config::global(); | ||
| if config.get_param::<bool>("ALPHA_FEATURES").unwrap_or(false) { | ||
| prefixed_tools.push(subagent_tools::run_task_subagent_tool()); | ||
| } | ||
| // Dynamic task tool | ||
| prefixed_tools.push(create_dynamic_task_tool()); | ||
|
|
||
| // Add resource tools if supported | ||
| if extension_manager.supports_resources() { | ||
|
|
@@ -589,8 +589,7 @@ impl Agent { | |
| if let Some(final_output_tool) = self.final_output_tool.lock().await.as_ref() { | ||
| prefixed_tools.push(final_output_tool.tool()); | ||
| } | ||
| prefixed_tools | ||
| .push(sub_recipe_execute_task_tool::create_sub_recipe_execute_task_tool()); | ||
| prefixed_tools.push(subagent_execute_task_tool::create_subagent_execute_task_tool()); | ||
| } | ||
|
|
||
| prefixed_tools | ||
|
|
@@ -1074,15 +1073,6 @@ impl Agent { | |
| let mut current_provider = self.provider.lock().await; | ||
| *current_provider = Some(provider.clone()); | ||
|
|
||
| // Initialize subagent manager with MCP notification support | ||
| // Need to recreate the MCP channel since we're replacing the manager | ||
| let (mcp_tx, mcp_rx) = mpsc::channel(100); | ||
| { | ||
| let mut rx_guard = self.mcp_notification_rx.lock().await; | ||
| *rx_guard = mcp_rx; | ||
| } | ||
| *self.subagent_manager.lock().await = Some(SubAgentManager::new(mcp_tx)); | ||
|
|
||
| self.update_router_tool_selector(Some(provider), None) | ||
| .await?; | ||
| Ok(()) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| // ======================================= | ||
| // Module: Dynamic Task Tools | ||
| // Handles creation of tasks dynamically without sub-recipes | ||
| // ======================================= | ||
| use crate::agents::subagent_execution_tool::tasks_manager::TasksManager; | ||
| use crate::agents::subagent_execution_tool::{lib::ExecutionMode, task_types::Task}; | ||
| use crate::agents::tool_execution::ToolCallResult; | ||
| use mcp_core::{tool::ToolAnnotations, Content, Tool, ToolError}; | ||
| use serde_json::{json, Value}; | ||
|
|
||
| pub const DYNAMIC_TASK_TOOL_NAME_PREFIX: &str = "dynamic_task__create_task"; | ||
|
|
||
| pub fn create_dynamic_task_tool() -> Tool { | ||
| Tool::new( | ||
| DYNAMIC_TASK_TOOL_NAME_PREFIX.to_string(), | ||
| "Use this tool to create one or more dynamic tasks from a shared text instruction and varying parameters.\ | ||
| How it works: | ||
| - Provide a single text instruction | ||
| - Use the 'task_parameters' field to pass an array of parameter sets | ||
| - Each resulting task will use the same instruction with different parameter values | ||
| This is useful when performing the same operation across many inputs (e.g., getting weather for multiple cities, searching multiple slack channels, iterating through various linear tickets, etc). | ||
| Once created, these tasks should be passed to the 'subagent__execute_task' tool for execution. Tasks can run sequentially or in parallel. | ||
| --- | ||
| What is a 'subagent'? | ||
| A 'subagent' is a stateless sub-process that executes a single task independently. Use subagents when: | ||
| - You want to parallelize similar work across different inputs | ||
| - You are not sure your search or operation will succeed on the first try | ||
| Each subagent receives a task with a defined payload and returns a result, which is not visible to the user unless explicitly summarized by the system. | ||
| --- | ||
| Examples of 'task_parameters' for a single task: | ||
| text_instruction: Search for the config file in the root directory. | ||
| Examples of 'task_parameters' for multiple tasks: | ||
| text_instruction: Get weather for Melbourne. | ||
| timeout_seconds: 300 | ||
| text_instruction: Get weather for Los Angeles. | ||
| timeout_seconds: 300 | ||
| text_instruction: Get weather for San Francisco. | ||
| timeout_seconds: 300 | ||
| ".to_string(), | ||
| json!({ | ||
| "type": "object", | ||
| "properties": { | ||
| "task_parameters": { | ||
| "type": "array", | ||
| "description": "Array of parameter sets for creating tasks. \ | ||
| For a single task, provide an array with one element. \ | ||
| For multiple tasks, provide an array with multiple elements, each with different parameter values. \ | ||
| If there is no parameter set, provide an empty array.", | ||
| "items": { | ||
| "type": "object", | ||
| "properties": { | ||
| "text_instruction": { | ||
| "type": "string", | ||
| "description": "The text instruction to execute" | ||
| }, | ||
| "timeout_seconds": { | ||
wendytang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "type": "integer", | ||
| "description": "Optional timeout for the task in seconds (default: 300)", | ||
| "minimum": 1 | ||
| } | ||
| }, | ||
| "required": ["text_instruction"] | ||
| } | ||
| } | ||
| } | ||
| }), | ||
| Some(ToolAnnotations { | ||
| title: Some("Dynamic Task Creation".to_string()), | ||
| read_only_hint: false, | ||
| destructive_hint: true, | ||
| idempotent_hint: false, | ||
| open_world_hint: true, | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| fn extract_task_parameters(params: &Value) -> Vec<Value> { | ||
| params | ||
| .get("task_parameters") | ||
| .and_then(|v| v.as_array()) | ||
| .cloned() | ||
| .unwrap_or_default() | ||
| } | ||
|
|
||
| fn create_text_instruction_tasks_from_params(task_params: &[Value]) -> Vec<Task> { | ||
| task_params | ||
| .iter() | ||
| .map(|task_param| { | ||
| let text_instruction = task_param | ||
| .get("text_instruction") | ||
| .and_then(|v| v.as_str()) | ||
| .unwrap_or("") | ||
| .to_string(); | ||
|
|
||
| let payload = json!({ | ||
| "text_instruction": text_instruction | ||
| }); | ||
|
|
||
| Task { | ||
| id: uuid::Uuid::new_v4().to_string(), | ||
| task_type: "text_instruction".to_string(), | ||
| payload, | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| fn create_task_execution_payload(tasks: Vec<Task>, execution_mode: ExecutionMode) -> Value { | ||
| let task_ids: Vec<String> = tasks.iter().map(|task| task.id.clone()).collect(); | ||
| json!({ | ||
| "task_ids": task_ids, | ||
| "execution_mode": execution_mode | ||
| }) | ||
| } | ||
|
|
||
| pub async fn create_dynamic_task(params: Value, tasks_manager: &TasksManager) -> ToolCallResult { | ||
| let task_params_array = extract_task_parameters(¶ms); | ||
|
|
||
| if task_params_array.is_empty() { | ||
| return ToolCallResult::from(Err(ToolError::ExecutionError( | ||
| "No task parameters provided".to_string(), | ||
| ))); | ||
| } | ||
|
|
||
| let tasks = create_text_instruction_tasks_from_params(&task_params_array); | ||
|
|
||
| // Use parallel execution if there are multiple tasks, sequential for single task | ||
| let execution_mode = if tasks.len() > 1 { | ||
| ExecutionMode::Parallel | ||
| } else { | ||
| ExecutionMode::Sequential | ||
| }; | ||
|
|
||
| let task_execution_payload = create_task_execution_payload(tasks.clone(), execution_mode); | ||
|
|
||
| let tasks_json = match serde_json::to_string(&task_execution_payload) { | ||
| Ok(json) => json, | ||
| Err(e) => { | ||
| return ToolCallResult::from(Err(ToolError::ExecutionError(format!( | ||
| "Failed to serialize task list: {}", | ||
| e | ||
| )))) | ||
| } | ||
| }; | ||
| tasks_manager.save_tasks(tasks.clone()).await; | ||
| ToolCallResult::from(Ok(vec![Content::text(tasks_json)])) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.