Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions crates/goose/src/agents/subagent_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,29 @@ struct SubagentPromptContext {
type AgentMessagesFuture =
Pin<Box<dyn Future<Output = Result<(Conversation, Option<String>)>> + Send>>;

/// Standalone function to run a complete subagent task with output options
pub async fn run_complete_subagent_task(
recipe: Recipe,
task_config: TaskConfig,
return_last_only: bool,
session_id: String,
cancellation_token: Option<CancellationToken>,
system_prompt_template: Option<&str>,
) -> Result<String, anyhow::Error> {
let (messages, final_output) =
get_agent_messages(recipe, task_config, session_id, cancellation_token)
.await
.map_err(|e| {
ErrorData::new(
ErrorCode::INTERNAL_ERROR,
format!("Failed to execute task: {}", e),
None,
)
})?;
let (messages, final_output) = get_agent_messages(
recipe,
task_config,
session_id,
cancellation_token,
system_prompt_template.map(|s| s.to_string()),
)
.await
.map_err(|e| {
ErrorData::new(
ErrorCode::INTERNAL_ERROR,
format!("Failed to execute task: {}", e),
None,
)
})?;

if let Some(output) = final_output {
return Ok(output);
Expand Down Expand Up @@ -115,6 +120,7 @@ fn get_agent_messages(
task_config: TaskConfig,
session_id: String,
cancellation_token: Option<CancellationToken>,
system_prompt_template: Option<String>,
) -> AgentMessagesFuture {
Box::pin(async move {
let text_instruction = recipe
Expand Down Expand Up @@ -152,25 +158,31 @@ fn get_agent_messages(
.apply_recipe_components(recipe.sub_recipes.clone(), recipe.response.clone(), true)
.await;

let tools = agent.list_tools(None).await;
let subagent_prompt = render_global_file(
"subagent_system.md",
&SubagentPromptContext {
if let Some(template) = system_prompt_template {
let tools = agent.list_tools(None).await;
let available_tools = tools
.iter()
.map(|t| t.name.to_string())
.collect::<Vec<_>>()
.join(", ");
let context = SubagentPromptContext {
max_turns: task_config
.max_turns
.expect("TaskConfig always sets max_turns"),
subagent_id: session_id.clone(),
task_instructions: text_instruction.clone(),
tool_count: tools.len(),
available_tools: tools
.iter()
.map(|t| t.name.to_string())
.collect::<Vec<_>>()
.join(", "),
},
)
.map_err(|e| anyhow!("Failed to render subagent system prompt: {}", e))?;
agent.override_system_prompt(subagent_prompt).await;
available_tools,
};
let prompt = render_global_file(&template, &context).map_err(|e| {
anyhow!(
"Failed to render system prompt template '{}': {}",
template,
e
)
})?;
agent.override_system_prompt(prompt).await;
}

let user_message = Message::user().with_text(text_instruction);
let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);
Expand Down
4 changes: 1 addition & 3 deletions crates/goose/src/agents/subagent_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,6 @@ fn get_subrecipe_params_description(sub_recipe: &SubRecipe) -> String {
}
}

/// Note: SubRecipe.sequential_when_repeated is surfaced as a hint in the tool description
/// (e.g., "[run sequentially, not in parallel]") but not enforced. The LLM controls
/// sequencing by making sequential vs parallel tool calls.
pub fn handle_subagent_tool(
params: Value,
task_config: TaskConfig,
Expand Down Expand Up @@ -269,6 +266,7 @@ async fn execute_subagent(
params.summary,
session.id,
cancellation_token,
Some("subagent_system.md"),
)
.await;

Expand Down
100 changes: 22 additions & 78 deletions crates/goose/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use tokio::sync::Mutex;
use tokio_cron_scheduler::{job::JobId, Job, JobScheduler as TokioJobScheduler};
use tokio_util::sync::CancellationToken;

use crate::agents::AgentEvent;
use crate::agents::{Agent, SessionConfig};
use crate::agents::subagent_handler::run_complete_subagent_task;
use crate::agents::{ExtensionConfig, TaskConfig};
use crate::config::paths::Paths;
use crate::config::Config;
use crate::conversation::message::Message;
use crate::conversation::Conversation;
use crate::model::ModelConfig;
use crate::providers::create;
use crate::recipe::Recipe;
use crate::scheduler_trait::SchedulerTrait;
Expand Down Expand Up @@ -46,7 +45,6 @@ pub enum SchedulerError {
JobNotFound(String),
StorageError(io::Error),
RecipeLoadError(String),
AgentSetupError(String),
PersistError(String),
CronParseError(String),
SchedulerInternalError(String),
Expand All @@ -60,7 +58,6 @@ impl std::fmt::Display for SchedulerError {
SchedulerError::JobNotFound(id) => write!(f, "Job ID '{}' not found.", id),
SchedulerError::StorageError(e) => write!(f, "Storage error: {}", e),
SchedulerError::RecipeLoadError(e) => write!(f, "Recipe load error: {}", e),
SchedulerError::AgentSetupError(e) => write!(f, "Agent setup error: {}", e),
SchedulerError::PersistError(e) => write!(f, "Failed to persist schedules: {}", e),
SchedulerError::CronParseError(e) => write!(f, "Invalid cron string: {}", e),
SchedulerError::SchedulerInternalError(e) => {
Expand Down Expand Up @@ -704,36 +701,15 @@ async fn execute_job(
return Ok(job.id.to_string());
}

let recipe_path = Path::new(&job.source);
let recipe_content = fs::read_to_string(recipe_path)?;

let recipe: Recipe = {
let extension = recipe_path
.extension()
.and_then(|s| s.to_str())
.unwrap_or("yaml")
.to_lowercase();

match extension.as_str() {
"json" | "jsonl" => serde_json::from_str(&recipe_content)?,
_ => serde_yaml::from_str(&recipe_content)?,
}
};

let agent = Agent::new();
let recipe = Recipe::from_file_path(Path::new(&job.source))?;

let config = Config::global();
let provider_name = config.get_goose_provider()?;
let model_name = config.get_goose_model()?;
let model_config = crate::model::ModelConfig::new(&model_name)?;

let agent_provider = create(&provider_name, model_config).await?;
let model_config = ModelConfig::new(&model_name)?;
let provider = create(&provider_name, model_config).await?;

if let Some(ref extensions) = recipe.extensions {
for ext in extensions {
agent.add_extension(ext.clone()).await?;
}
}
let extensions: Vec<ExtensionConfig> = recipe.extensions.clone().unwrap_or_default();

let session = SessionManager::create_session(
std::env::current_dir()?,
Expand All @@ -742,63 +718,31 @@ async fn execute_job(
)
.await?;

agent.update_provider(agent_provider, &session.id).await?;

let mut jobs_guard = jobs.lock().await;
if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) {
job_def.current_session_id = Some(session.id.clone());
{
let mut jobs_guard = jobs.lock().await;
if let Some((_, job_def)) = jobs_guard.get_mut(job_id.as_str()) {
job_def.current_session_id = Some(session.id.clone());
}
}

let prompt_text = recipe
.prompt
.as_ref()
.or(recipe.instructions.as_ref())
.unwrap();

let user_message = Message::user().with_text(prompt_text);
let mut conversation = Conversation::new_unvalidated(vec![user_message.clone()]);

let session_config = SessionConfig {
id: session.id.clone(),
schedule_id: Some(job.id.clone()),
max_turns: None,
retry_config: None,
};
let task_config = TaskConfig::new(provider, &session.id, &std::env::current_dir()?, extensions);

let session_id = session_config.id.clone();
let stream = crate::session_context::with_session_id(Some(session_id.clone()), async {
agent
.reply(user_message, session_config, Some(cancel_token))
.await
})
let _result = run_complete_subagent_task(
recipe.clone(),
task_config,
false,
session.id.clone(),
Some(cancel_token),
None,
)
.await?;

use futures::StreamExt;
let mut stream = std::pin::pin!(stream);

while let Some(message_result) = stream.next().await {
tokio::task::yield_now().await;

match message_result {
Ok(AgentEvent::Message(msg)) => {
conversation.push(msg);
}
Ok(AgentEvent::HistoryReplaced(updated)) => {
conversation = updated;
}
Ok(_) => {}
Err(e) => {
tracing::error!("Error in agent stream: {}", e);
break;
}
}
}

SessionManager::update_session(&session.id)
.schedule_id(Some(job.id.clone()))
.recipe(Some(recipe))
.apply()
.await?;

Ok(session.id)
}

Expand Down
Loading