Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/goose/src/agents/subagent_execution_tool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn handle_recipe_task(
}

tokio::select! {
result = run_complete_subagent_task(recipe, task_config, return_last_only, task.id.clone()) => {
result = run_complete_subagent_task(recipe, task_config, return_last_only, task.id.clone(), None) => {
result.map(|text| serde_json::json!({"result": text}))
.map_err(|e| format!("Recipe execution failed: {}", e))
}
Expand Down
6 changes: 4 additions & 2 deletions crates/goose/src/agents/subagent_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ pub async fn run_complete_subagent_task(
task_config: TaskConfig,
return_last_only: bool,
session_id: String,
schedule_id: Option<String>,
) -> Result<String, anyhow::Error> {
let (messages, final_output) = get_agent_messages(recipe, task_config, session_id)
let (messages, final_output) = get_agent_messages(recipe, task_config, session_id, schedule_id)
.await
.map_err(|e| {
ErrorData::new(
Expand Down Expand Up @@ -99,6 +100,7 @@ fn get_agent_messages(
recipe: Recipe,
task_config: TaskConfig,
session_id: String,
schedule_id: Option<String>,
) -> AgentMessagesFuture {
Box::pin(async move {
let text_instruction = recipe
Expand Down Expand Up @@ -146,7 +148,7 @@ fn get_agent_messages(
}
let session_config = SessionConfig {
id: session_id.clone(),
schedule_id: None,
schedule_id,
max_turns: task_config.max_turns.map(|v| v as u32),
retry_config: recipe.retry,
};
Expand Down
285 changes: 104 additions & 181 deletions crates/goose/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@ use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler};

use crate::agents::AgentEvent;
use crate::agents::{Agent, SessionConfig};
use crate::config::paths::Paths;
use crate::config::Config;
use crate::conversation::message::Message;
use crate::conversation::Conversation;
use crate::providers::base::Provider as GooseProvider; // Alias to avoid conflict in test section
use crate::providers::create;
use crate::recipe::Recipe;
Expand Down Expand Up @@ -1051,208 +1047,96 @@ async fn run_scheduled_job_internal(
jobs_arc: Option<Arc<Mutex<JobsMap>>>,
job_id: Option<String>,
) -> std::result::Result<String, JobExecutionError> {
tracing::info!("Executing job: {} (Source: {})", job.id, job.source);
use crate::agents::subagent_handler::run_complete_subagent_task;
use crate::agents::TaskConfig;

let recipe_path = Path::new(&job.source);

let recipe_content = match fs::read_to_string(recipe_path) {
Ok(content) => content,
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to load recipe file '{}': {}", job.source, e),
});
}
};

let recipe: Recipe = {
let extension = recipe_path
.extension()
.and_then(|os_str| os_str.to_str())
.unwrap_or("yaml")
.to_lowercase();

match extension.as_str() {
"json" | "jsonl" => {
serde_json::from_str::<Recipe>(&recipe_content).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to parse JSON recipe '{}': {}", job.source, e),
})
}
"yaml" | "yml" => {
serde_yaml::from_str::<Recipe>(&recipe_content).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to parse YAML recipe '{}': {}", job.source, e),
})
}
_ => Err(JobExecutionError {
job_id: job.id.clone(),
error: format!(
"Unsupported recipe file extension '{}' for: {}",
extension, job.source
),
}),
}
}?;

let agent: Agent = Agent::new();

let agent_provider: Arc<dyn GooseProvider>;
tracing::info!(
"Executing scheduled job: {} (Source: {})",
job.id,
job.source
);

if let Some(provider) = provider_override {
agent_provider = provider;
} else {
let global_config = Config::global();
let provider_name: String = match global_config.get_goose_provider() {
Ok(name) => name,
Err(_) => return Err(JobExecutionError {
job_id: job.id.clone(),
error:
"GOOSE_PROVIDER not configured globally. Run 'goose configure' or set env var."
.to_string(),
}),
};
let model_name: String =
match global_config.get_goose_model() {
Ok(name) => name,
Err(_) => return Err(JobExecutionError {
job_id: job.id.clone(),
error:
"GOOSE_MODEL not configured globally. Run 'goose configure' or set env var."
.to_string(),
}),
};
let model_config =
crate::model::ModelConfig::new(model_name.as_str()).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Model config error: {}", e),
})?;

agent_provider =
create(&provider_name, model_config)
.await
.map_err(|e| JobExecutionError {
let recipe_path = Path::new(&job.source);
let recipe_content = fs::read_to_string(recipe_path).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to load recipe file '{}': {}", job.source, e),
})?;

let recipe: Recipe = Recipe::from_content(&recipe_content).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to parse recipe '{}': {}", job.source, e),
})?;

let provider = if let Some(provider) = provider_override {
provider
} else if let Some(settings) = &recipe.settings {
if let (Some(provider_name), Some(model_name)) =
(&settings.goose_provider, &settings.goose_model)
{
let mut model_config =
crate::model::ModelConfig::new(model_name).map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!(
"Failed to create provider instance '{}': {}",
provider_name, e
),
error: format!("Invalid model config: {}", e),
})?;
}

if let Some(ref recipe_extensions) = recipe.extensions {
for extension in recipe_extensions {
agent
.add_extension(extension.clone())
if let Some(temp) = settings.temperature {
model_config = model_config.with_temperature(Some(temp));
}
create(provider_name, model_config)
.await
.map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to add extension '{}': {}", extension.name(), e),
})?;
}
}

if let Err(e) = agent.update_provider(agent_provider).await {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to set provider on agent: {}", e),
});
}
tracing::info!("Agent configured with provider for job '{}'", job.id);

let current_dir = match std::env::current_dir() {
Ok(cd) => cd,
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to get current directory for job execution: {}", e),
});
error: format!("Failed to create provider from recipe settings: {}", e),
})?
} else {
get_default_provider(&job.id).await?
}
} else {
get_default_provider(&job.id).await?
};

let session = match SessionManager::create_session(
current_dir.clone(),
let working_dir = std::env::current_dir().map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to get working directory: {}", e),
})?;

let session = SessionManager::create_session(
working_dir.clone(),
format!("Scheduled job: {}", job.id),
SessionType::Scheduled,
)
.await
{
Ok(s) => s,
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to create session: {}", e),
});
}
};
.map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Failed to create session: {}", e),
})?;

// Update the job with the session ID if we have access to the jobs arc
if let (Some(jobs_arc), Some(job_id_str)) = (jobs_arc.as_ref(), job_id.as_ref()) {
let mut jobs_guard = jobs_arc.lock().await;
if let Some((_, job_def)) = jobs_guard.get_mut(job_id_str) {
job_def.current_session_id = Some(session.id.clone());
}
}

// Use prompt if available, otherwise fall back to instructions
let prompt_text = recipe
.prompt
.as_ref()
.or(recipe.instructions.as_ref())
.unwrap();

let user_message = Message::user().with_text(prompt_text);
let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);

let session_config = SessionConfig {
id: session.id.clone(),
schedule_id: Some(job.id.clone()),
let task_config = TaskConfig {
provider,
parent_session_id: session.id.clone(),
parent_working_dir: working_dir,
extensions: recipe.extensions.clone().unwrap_or_default(),
max_turns: None,
retry_config: None,
};

let session_id = Some(session_config.id.clone());
match crate::session_context::with_session_id(session_id, async {
agent.reply(user_message, session_config, None).await
})
let _result = run_complete_subagent_task(
recipe.clone(),
task_config,
false,
session.id.clone(),
Some(job.id.clone()),
)
.await
{
Ok(mut stream) => {
use futures::StreamExt;

while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;

match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == rmcp::model::Role::Assistant {
tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content);
}
conversation.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {}
Ok(AgentEvent::ModelChange { .. }) => {}
Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
conversation = updated_conversation;
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",
job.id,
e
);
break;
}
}
}
}
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Agent failed to reply for recipe '{}': {}", job.source, e),
});
}
}
.map_err(|e| JobExecutionError {
job_id: job.id.clone(),
error: format!("Recipe execution failed: {}", e),
})?;

if let Err(e) = SessionManager::update_session(&session.id)
.schedule_id(Some(job.id.clone()))
Expand All @@ -1263,10 +1147,49 @@ async fn run_scheduled_job_internal(
tracing::error!("[Job {}] Failed to update session metadata: {}", job.id, e);
}

tracing::info!("Finished job: {}", job.id);
tracing::info!(
"Scheduled job {} completed, session: {}",
job.id,
session.id
);
Ok(session.id)
}

// Helper function to get default provider from global config
async fn get_default_provider(
job_id: &str,
) -> std::result::Result<Arc<dyn GooseProvider>, JobExecutionError> {
let global_config = Config::global();
let provider_name = global_config
.get_goose_provider()
.map_err(|_| JobExecutionError {
job_id: job_id.to_string(),
error: "GOOSE_PROVIDER not configured globally. Run 'goose configure' or set env var."
.to_string(),
})?;

let model_name = global_config
.get_goose_model()
.map_err(|_| JobExecutionError {
job_id: job_id.to_string(),
error: "GOOSE_MODEL not configured globally. Run 'goose configure' or set env var."
.to_string(),
})?;

let model_config =
crate::model::ModelConfig::new(&model_name).map_err(|e| JobExecutionError {
job_id: job_id.to_string(),
error: format!("Model config error: {}", e),
})?;

create(&provider_name, model_config)
.await
.map_err(|e| JobExecutionError {
job_id: job_id.to_string(),
error: format!("Failed to create provider: {}", e),
})
}

#[async_trait]
impl SchedulerTrait for Scheduler {
async fn add_scheduled_job(&self, job: ScheduledJob) -> Result<(), SchedulerError> {
Expand Down