diff --git a/crates/goose/src/agents/subagent_execution_tool/tasks.rs b/crates/goose/src/agents/subagent_execution_tool/tasks.rs index 81269b80dcec..7e637cb059a5 100644 --- a/crates/goose/src/agents/subagent_execution_tool/tasks.rs +++ b/crates/goose/src/agents/subagent_execution_tool/tasks.rs @@ -83,7 +83,7 @@ async fn handle_recipe_task( } tokio::select! { - result = run_complete_subagent_task(recipe, task_config, return_last_only, task.id.clone()) => { + result = run_complete_subagent_task(recipe, task_config, return_last_only, task.id.clone(), None) => { result.map(|text| serde_json::json!({"result": text})) .map_err(|e| format!("Recipe execution failed: {}", e)) } diff --git a/crates/goose/src/agents/subagent_handler.rs b/crates/goose/src/agents/subagent_handler.rs index 0ab0310a7837..6a9eceede29f 100644 --- a/crates/goose/src/agents/subagent_handler.rs +++ b/crates/goose/src/agents/subagent_handler.rs @@ -20,8 +20,9 @@ pub async fn run_complete_subagent_task( task_config: TaskConfig, return_last_only: bool, session_id: String, + schedule_id: Option, ) -> Result { - let (messages, final_output) = get_agent_messages(recipe, task_config, session_id) + let (messages, final_output) = get_agent_messages(recipe, task_config, session_id, schedule_id) .await .map_err(|e| { ErrorData::new( @@ -99,6 +100,7 @@ fn get_agent_messages( recipe: Recipe, task_config: TaskConfig, session_id: String, + schedule_id: Option, ) -> AgentMessagesFuture { Box::pin(async move { let text_instruction = recipe @@ -146,7 +148,7 @@ fn get_agent_messages( } let session_config = SessionConfig { id: session_id.clone(), - schedule_id: None, + schedule_id, max_turns: task_config.max_turns.map(|v| v as u32), retry_config: recipe.retry, }; diff --git a/crates/goose/src/scheduler.rs b/crates/goose/src/scheduler.rs index 509450502ee5..9711c3ce4b28 100644 --- a/crates/goose/src/scheduler.rs +++ b/crates/goose/src/scheduler.rs @@ -11,12 +11,8 @@ use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler}; -use crate::agents::AgentEvent; -use crate::agents::{Agent, SessionConfig}; use crate::config::paths::Paths; use crate::config::Config; -use crate::conversation::message::Message; -use crate::conversation::Conversation; use crate::providers::base::Provider as GooseProvider; // Alias to avoid conflict in test section use crate::providers::create; use crate::recipe::Recipe; @@ -1051,142 +1047,69 @@ async fn run_scheduled_job_internal( jobs_arc: Option>>, job_id: Option, ) -> std::result::Result { - tracing::info!("Executing job: {} (Source: {})", job.id, job.source); + use crate::agents::subagent_handler::run_complete_subagent_task; + use crate::agents::TaskConfig; - let recipe_path = Path::new(&job.source); - - let recipe_content = match fs::read_to_string(recipe_path) { - Ok(content) => content, - Err(e) => { - return Err(JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to load recipe file '{}': {}", job.source, e), - }); - } - }; - - let recipe: Recipe = { - let extension = recipe_path - .extension() - .and_then(|os_str| os_str.to_str()) - .unwrap_or("yaml") - .to_lowercase(); - - match extension.as_str() { - "json" | "jsonl" => { - serde_json::from_str::(&recipe_content).map_err(|e| JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to parse JSON recipe '{}': {}", job.source, e), - }) - } - "yaml" | "yml" => { - serde_yaml::from_str::(&recipe_content).map_err(|e| JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to parse YAML recipe '{}': {}", job.source, e), - }) - } - _ => Err(JobExecutionError { - job_id: job.id.clone(), - error: format!( - "Unsupported recipe file extension '{}' for: {}", - extension, job.source - ), - }), - } - }?; - - let agent: Agent = Agent::new(); - - let agent_provider: Arc; + tracing::info!( + "Executing scheduled job: {} (Source: {})", + job.id, + job.source + ); - if let Some(provider) = provider_override { - agent_provider = provider; - } else { - let global_config = Config::global(); - let provider_name: String = match global_config.get_goose_provider() { - Ok(name) => name, - Err(_) => return Err(JobExecutionError { - job_id: job.id.clone(), - error: - "GOOSE_PROVIDER not configured globally. Run 'goose configure' or set env var." - .to_string(), - }), - }; - let model_name: String = - match global_config.get_goose_model() { - Ok(name) => name, - Err(_) => return Err(JobExecutionError { - job_id: job.id.clone(), - error: - "GOOSE_MODEL not configured globally. Run 'goose configure' or set env var." - .to_string(), - }), - }; - let model_config = - crate::model::ModelConfig::new(model_name.as_str()).map_err(|e| JobExecutionError { - job_id: job.id.clone(), - error: format!("Model config error: {}", e), - })?; - - agent_provider = - create(&provider_name, model_config) - .await - .map_err(|e| JobExecutionError { + let recipe_path = Path::new(&job.source); + let recipe_content = fs::read_to_string(recipe_path).map_err(|e| JobExecutionError { + job_id: job.id.clone(), + error: format!("Failed to load recipe file '{}': {}", job.source, e), + })?; + + let recipe: Recipe = Recipe::from_content(&recipe_content).map_err(|e| JobExecutionError { + job_id: job.id.clone(), + error: format!("Failed to parse recipe '{}': {}", job.source, e), + })?; + + let provider = if let Some(provider) = provider_override { + provider + } else if let Some(settings) = &recipe.settings { + if let (Some(provider_name), Some(model_name)) = + (&settings.goose_provider, &settings.goose_model) + { + let mut model_config = + crate::model::ModelConfig::new(model_name).map_err(|e| JobExecutionError { job_id: job.id.clone(), - error: format!( - "Failed to create provider instance '{}': {}", - provider_name, e - ), + error: format!("Invalid model config: {}", e), })?; - } - - if let Some(ref recipe_extensions) = recipe.extensions { - for extension in recipe_extensions { - agent - .add_extension(extension.clone()) + if let Some(temp) = settings.temperature { + model_config = model_config.with_temperature(Some(temp)); + } + create(provider_name, model_config) .await .map_err(|e| JobExecutionError { job_id: job.id.clone(), - error: format!("Failed to add extension '{}': {}", extension.name(), e), - })?; - } - } - - if let Err(e) = agent.update_provider(agent_provider).await { - return Err(JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to set provider on agent: {}", e), - }); - } - tracing::info!("Agent configured with provider for job '{}'", job.id); - - let current_dir = match std::env::current_dir() { - Ok(cd) => cd, - Err(e) => { - return Err(JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to get current directory for job execution: {}", e), - }); + error: format!("Failed to create provider from recipe settings: {}", e), + })? + } else { + get_default_provider(&job.id).await? } + } else { + get_default_provider(&job.id).await? }; - let session = match SessionManager::create_session( - current_dir.clone(), + let working_dir = std::env::current_dir().map_err(|e| JobExecutionError { + job_id: job.id.clone(), + error: format!("Failed to get working directory: {}", e), + })?; + + let session = SessionManager::create_session( + working_dir.clone(), format!("Scheduled job: {}", job.id), SessionType::Scheduled, ) .await - { - Ok(s) => s, - Err(e) => { - return Err(JobExecutionError { - job_id: job.id.clone(), - error: format!("Failed to create session: {}", e), - }); - } - }; + .map_err(|e| JobExecutionError { + job_id: job.id.clone(), + error: format!("Failed to create session: {}", e), + })?; - // Update the job with the session ID if we have access to the jobs arc if let (Some(jobs_arc), Some(job_id_str)) = (jobs_arc.as_ref(), job_id.as_ref()) { let mut jobs_guard = jobs_arc.lock().await; if let Some((_, job_def)) = jobs_guard.get_mut(job_id_str) { @@ -1194,65 +1117,26 @@ async fn run_scheduled_job_internal( } } - // Use prompt if available, otherwise fall back to instructions - let prompt_text = recipe - .prompt - .as_ref() - .or(recipe.instructions.as_ref()) - .unwrap(); - - let user_message = Message::user().with_text(prompt_text); - let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]); - - let session_config = SessionConfig { - id: session.id.clone(), - schedule_id: Some(job.id.clone()), + let task_config = TaskConfig { + provider, + parent_session_id: session.id.clone(), + parent_working_dir: working_dir, + extensions: recipe.extensions.clone().unwrap_or_default(), max_turns: None, - retry_config: None, }; - let session_id = Some(session_config.id.clone()); - match crate::session_context::with_session_id(session_id, async { - agent.reply(user_message, session_config, None).await - }) + let _result = run_complete_subagent_task( + recipe.clone(), + task_config, + false, + session.id.clone(), + Some(job.id.clone()), + ) .await - { - Ok(mut stream) => { - use futures::StreamExt; - - while let Some(message_result) = stream.next().await { - tokio::task::yield_now().await; - - match message_result { - Ok(AgentEvent::Message(msg)) => { - if msg.role == rmcp::model::Role::Assistant { - tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content); - } - conversation.push(msg); - } - Ok(AgentEvent::McpNotification(_)) => {} - Ok(AgentEvent::ModelChange { .. }) => {} - Ok(AgentEvent::HistoryReplaced(updated_conversation)) => { - conversation = updated_conversation; - } - Err(e) => { - tracing::error!( - "[Job {}] Error receiving message from agent: {}", - job.id, - e - ); - break; - } - } - } - } - Err(e) => { - return Err(JobExecutionError { - job_id: job.id.clone(), - error: format!("Agent failed to reply for recipe '{}': {}", job.source, e), - }); - } - } + .map_err(|e| JobExecutionError { + job_id: job.id.clone(), + error: format!("Recipe execution failed: {}", e), + })?; if let Err(e) = SessionManager::update_session(&session.id) .schedule_id(Some(job.id.clone())) @@ -1263,10 +1147,49 @@ async fn run_scheduled_job_internal( tracing::error!("[Job {}] Failed to update session metadata: {}", job.id, e); } - tracing::info!("Finished job: {}", job.id); + tracing::info!( + "Scheduled job {} completed, session: {}", + job.id, + session.id + ); Ok(session.id) } +// Helper function to get default provider from global config +async fn get_default_provider( + job_id: &str, +) -> std::result::Result, JobExecutionError> { + let global_config = Config::global(); + let provider_name = global_config + .get_goose_provider() + .map_err(|_| JobExecutionError { + job_id: job_id.to_string(), + error: "GOOSE_PROVIDER not configured globally. Run 'goose configure' or set env var." + .to_string(), + })?; + + let model_name = global_config + .get_goose_model() + .map_err(|_| JobExecutionError { + job_id: job_id.to_string(), + error: "GOOSE_MODEL not configured globally. Run 'goose configure' or set env var." + .to_string(), + })?; + + let model_config = + crate::model::ModelConfig::new(&model_name).map_err(|e| JobExecutionError { + job_id: job_id.to_string(), + error: format!("Model config error: {}", e), + })?; + + create(&provider_name, model_config) + .await + .map_err(|e| JobExecutionError { + job_id: job_id.to_string(), + error: format!("Failed to create provider: {}", e), + }) +} + #[async_trait] impl SchedulerTrait for Scheduler { async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {