Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 55 additions & 58 deletions crates/goose/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,9 @@ impl Scheduler {
schedule_sessions.push((session.id.clone(), session));
}
}
schedule_sessions.sort_by(|a, b| b.0.cmp(&a.0));

// Sort by created_at timestamp, newest first
schedule_sessions.sort_by(|a, b| b.1.created_at.cmp(&a.1.created_at));

let result_sessions: Vec<(String, Session)> =
schedule_sessions.into_iter().take(limit).collect();
Expand Down Expand Up @@ -1171,14 +1173,10 @@ async fn run_scheduled_job_internal(
}
};

// Create session upfront for both cases
// Create session upfront
let session = match SessionManager::create_session(
current_dir.clone(),
if recipe.prompt.is_some() {
format!("Scheduled job: {}", job.id)
} else {
"Empty job - no prompt".to_string()
},
format!("Scheduled job: {}", job.id),
)
.await
{
Expand All @@ -1199,65 +1197,64 @@ async fn run_scheduled_job_internal(
}
}

if let Some(ref prompt_text) = recipe.prompt {
let mut conversation =
Conversation::new_unvalidated(vec![Message::user().with_text(prompt_text.clone())]);

let session_config = SessionConfig {
id: session.id.clone(),
working_dir: current_dir.clone(),
schedule_id: Some(job.id.clone()),
execution_mode: job.execution_mode.clone(),
max_turns: None,
retry_config: None,
};
// Use prompt if available, otherwise fall back to instructions
let prompt_text = recipe
.prompt
.as_ref()
.or(recipe.instructions.as_ref())
.unwrap();

let mut conversation =
Conversation::new_unvalidated(vec![Message::user().with_text(prompt_text.clone())]);

let session_config = SessionConfig {
id: session.id.clone(),
working_dir: current_dir.clone(),
schedule_id: Some(job.id.clone()),
execution_mode: job.execution_mode.clone(),
max_turns: None,
retry_config: None,
};

match agent
.reply(conversation.clone(), Some(session_config.clone()), None)
.await
{
Ok(mut stream) => {
use futures::StreamExt;
match agent
.reply(conversation.clone(), Some(session_config.clone()), None)
.await
{
Ok(mut stream) => {
use futures::StreamExt;

while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;
while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;

match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == rmcp::model::Role::Assistant {
tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content);
}
conversation.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {}
Ok(AgentEvent::ModelChange { .. }) => {}
Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
conversation = updated_conversation;
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",
job.id,
e
);
break;
match message_result {
Ok(AgentEvent::Message(msg)) => {
if msg.role == rmcp::model::Role::Assistant {
tracing::info!("[Job {}] Assistant: {:?}", job.id, msg.content);
}
conversation.push(msg);
}
Ok(AgentEvent::McpNotification(_)) => {}
Ok(AgentEvent::ModelChange { .. }) => {}
Ok(AgentEvent::HistoryReplaced(updated_conversation)) => {
conversation = updated_conversation;
}
Err(e) => {
tracing::error!(
"[Job {}] Error receiving message from agent: {}",
job.id,
e
);
break;
}
}
}
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Agent failed to reply for recipe '{}': {}", job.source, e),
});
}
}
} else {
tracing::warn!(
"[Job {}] Recipe '{}' has no prompt to execute.",
job.id,
job.source
);
Err(e) => {
return Err(JobExecutionError {
job_id: job.id.clone(),
error: format!("Agent failed to reply for recipe '{}': {}", job.source, e),
});
}
}

if let Err(e) = SessionManager::update_session(&session.id)
Expand Down