diff --git a/crates/goose/src/agents/subagent_handler.rs b/crates/goose/src/agents/subagent_handler.rs index 98e87a524f58..6ba156f839ab 100644 --- a/crates/goose/src/agents/subagent_handler.rs +++ b/crates/goose/src/agents/subagent_handler.rs @@ -26,24 +26,29 @@ struct SubagentPromptContext { type AgentMessagesFuture = Pin)>> + Send>>; -/// Standalone function to run a complete subagent task with output options pub async fn run_complete_subagent_task( recipe: Recipe, task_config: TaskConfig, return_last_only: bool, session_id: String, cancellation_token: Option, + system_prompt_template: Option<&str>, ) -> Result { - let (messages, final_output) = - get_agent_messages(recipe, task_config, session_id, cancellation_token) - .await - .map_err(|e| { - ErrorData::new( - ErrorCode::INTERNAL_ERROR, - format!("Failed to execute task: {}", e), - None, - ) - })?; + let (messages, final_output) = get_agent_messages( + recipe, + task_config, + session_id, + cancellation_token, + system_prompt_template.map(|s| s.to_string()), + ) + .await + .map_err(|e| { + ErrorData::new( + ErrorCode::INTERNAL_ERROR, + format!("Failed to execute task: {}", e), + None, + ) + })?; if let Some(output) = final_output { return Ok(output); @@ -115,6 +120,7 @@ fn get_agent_messages( task_config: TaskConfig, session_id: String, cancellation_token: Option, + system_prompt_template: Option, ) -> AgentMessagesFuture { Box::pin(async move { let text_instruction = recipe @@ -152,25 +158,31 @@ fn get_agent_messages( .apply_recipe_components(recipe.sub_recipes.clone(), recipe.response.clone(), true) .await; - let tools = agent.list_tools(None).await; - let subagent_prompt = render_global_file( - "subagent_system.md", - &SubagentPromptContext { + if let Some(template) = system_prompt_template { + let tools = agent.list_tools(None).await; + let available_tools = tools + .iter() + .map(|t| t.name.to_string()) + .collect::>() + .join(", "); + let context = SubagentPromptContext { max_turns: task_config .max_turns .expect("TaskConfig always sets max_turns"), subagent_id: session_id.clone(), task_instructions: text_instruction.clone(), tool_count: tools.len(), - available_tools: tools - .iter() - .map(|t| t.name.to_string()) - .collect::>() - .join(", "), - }, - ) - .map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?; - agent.override_system_prompt(subagent_prompt).await; + available_tools, + }; + let prompt = render_global_file(&template, &context).map_err(|e| { + anyhow!( + "Failed to render system prompt template '{}': {}", + template, + e + ) + })?; + agent.override_system_prompt(prompt).await; + } let user_message = Message::user().with_text(text_instruction); let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]); diff --git a/crates/goose/src/agents/subagent_tool.rs b/crates/goose/src/agents/subagent_tool.rs index c775c070bc15..f70b53b75f10 100644 --- a/crates/goose/src/agents/subagent_tool.rs +++ b/crates/goose/src/agents/subagent_tool.rs @@ -173,9 +173,6 @@ fn get_subrecipe_params_description(sub_recipe: &SubRecipe) -> String { } } -/// Note: SubRecipe.sequential_when_repeated is surfaced as a hint in the tool description -/// (e.g., "[run sequentially, not in parallel]") but not enforced. The LLM controls -/// sequencing by making sequential vs parallel tool calls. pub fn handle_subagent_tool( params: Value, task_config: TaskConfig, @@ -269,6 +266,7 @@ async fn execute_subagent( params.summary, session.id, cancellation_token, + Some("subagent_system.md"), ) .await; diff --git a/crates/goose/src/scheduler.rs b/crates/goose/src/scheduler.rs index d9c01b9700e2..a220149a7a86 100644 --- a/crates/goose/src/scheduler.rs +++ b/crates/goose/src/scheduler.rs @@ -12,12 +12,11 @@ use tokio::sync::Mutex; use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler}; use tokio_util::sync::CancellationToken; -use crate::agents::AgentEvent; -use crate::agents::{Agent, SessionConfig}; +use crate::agents::subagent_handler::run_complete_subagent_task; +use crate::agents::{ExtensionConfig, TaskConfig}; use crate::config::paths::Paths; use crate::config::Config; -use crate::conversation::message::Message; -use crate::conversation::Conversation; +use crate::model::ModelConfig; use crate::providers::create; use crate::recipe::Recipe; use crate::scheduler_trait::SchedulerTrait; @@ -46,7 +45,6 @@ pub enum SchedulerError { JobNotFound(String), StorageError(io::Error), RecipeLoadError(String), - AgentSetupError(String), PersistError(String), CronParseError(String), SchedulerInternalError(String), @@ -60,7 +58,6 @@ impl std::fmt::Display for SchedulerError { SchedulerError::JobNotFound(id) => write!(f, "Job ID '{}' not found.", id), SchedulerError::StorageError(e) => write!(f, "Storage error: {}", e), SchedulerError::RecipeLoadError(e) => write!(f, "Recipe load error: {}", e), - SchedulerError::AgentSetupError(e) => write!(f, "Agent setup error: {}", e), SchedulerError::PersistError(e) => write!(f, "Failed to persist schedules: {}", e), SchedulerError::CronParseError(e) => write!(f, "Invalid cron string: {}", e), SchedulerError::SchedulerInternalError(e) => { @@ -704,36 +701,15 @@ async fn execute_job( return Ok(job.id.to_string()); } - let recipe_path = Path::new(&job.source); - let recipe_content = fs::read_to_string(recipe_path)?; - - let recipe: Recipe = { - let extension = recipe_path - .extension() - .and_then(|s| s.to_str()) - .unwrap_or("yaml") - .to_lowercase(); - - match extension.as_str() { - "json" | "jsonl" => serde_json::from_str(&recipe_content)?, - _ => serde_yaml::from_str(&recipe_content)?, - } - }; - - let agent = Agent::new(); + let recipe = Recipe::from_file_path(Path::new(&job.source))?; let config = Config::global(); let provider_name = config.get_goose_provider()?; let model_name = config.get_goose_model()?; - let model_config = crate::model::ModelConfig::new(&model_name)?; - - let agent_provider = create(&provider_name, model_config).await?; + let model_config = ModelConfig::new(&model_name)?; + let provider = create(&provider_name, model_config).await?; - if let Some(ref extensions) = recipe.extensions { - for ext in extensions { - agent.add_extension(ext.clone()).await?; - } - } + let extensions: Vec = recipe.extensions.clone().unwrap_or_default(); let session = SessionManager::create_session( std::env::current_dir()?, @@ -742,63 +718,31 @@ async fn execute_job( ) .await?; - agent.update_provider(agent_provider, &session.id).await?; - - let mut jobs_guard = jobs.lock().await; - if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) { - job_def.current_session_id = Some(session.id.clone()); + { + let mut jobs_guard = jobs.lock().await; + if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) { + job_def.current_session_id = Some(session.id.clone()); + } } - let prompt_text = recipe - .prompt - .as_ref() - .or(recipe.instructions.as_ref()) - .unwrap(); - - let user_message = Message::user().with_text(prompt_text); - let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]); - - let session_config = SessionConfig { - id: session.id.clone(), - schedule_id: Some(job.id.clone()), - max_turns: None, - retry_config: None, - }; + let task_config = TaskConfig::new(provider, &session.id, &std::env::current_dir()?, extensions); - let session_id = session_config.id.clone(); - let stream = crate::session_context::with_session_id(Some(session_id.clone()), async { - agent - .reply(user_message, session_config, Some(cancel_token)) - .await - }) + let _result = run_complete_subagent_task( + recipe.clone(), + task_config, + false, + session.id.clone(), + Some(cancel_token), + None, + ) .await?; - use futures::StreamExt; - let mut stream = std::pin::pin!(stream); - - while let Some(message_result) = stream.next().await { - tokio::task::yield_now().await; - - match message_result { - Ok(AgentEvent::Message(msg)) => { - conversation.push(msg); - } - Ok(AgentEvent::HistoryReplaced(updated)) => { - conversation = updated; - } - Ok(_) => {} - Err(e) => { - tracing::error!("Error in agent stream: {}", e); - break; - } - } - } - SessionManager::update_session(&session.id) .schedule_id(Some(job.id.clone())) .recipe(Some(recipe)) .apply() .await?; + Ok(session.id) }