diff --git a/.github/workflows/build-cli.yml b/.github/workflows/build-cli.yml index f91360a21f56..b18eb6c02c2a 100644 --- a/.github/workflows/build-cli.yml +++ b/.github/workflows/build-cli.yml @@ -17,7 +17,7 @@ on: ref: type: string required: false - default: 'refs/heads/main' + default: "" name: "Reusable workflow to build CLI" diff --git a/Cargo.lock b/Cargo.lock index e84e17b8f8a6..d0afe8473451 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3434,7 +3434,7 @@ dependencies = [ [[package]] name = "goose" -version = "1.0.36" +version = "1.1.4" dependencies = [ "ahash", "anyhow", @@ -3499,7 +3499,7 @@ dependencies = [ [[package]] name = "goose-bench" -version = "1.0.36" +version = "1.1.4" dependencies = [ "anyhow", "async-trait", @@ -3523,7 +3523,7 @@ dependencies = [ [[package]] name = "goose-cli" -version = "1.0.36" +version = "1.1.4" dependencies = [ "anyhow", "async-trait", @@ -3575,7 +3575,7 @@ dependencies = [ [[package]] name = "goose-ffi" -version = "1.0.36" +version = "1.1.4" dependencies = [ "cbindgen", "futures", @@ -3589,7 +3589,7 @@ dependencies = [ [[package]] name = "goose-llm" -version = "1.0.36" +version = "1.1.4" dependencies = [ "anyhow", "async-trait", @@ -3619,7 +3619,7 @@ dependencies = [ [[package]] name = "goose-mcp" -version = "1.0.36" +version = "1.1.4" dependencies = [ "anyhow", "async-trait", @@ -3671,7 +3671,7 @@ dependencies = [ [[package]] name = "goose-server" -version = "1.0.36" +version = "1.1.4" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index a386d2647d34..b5b240e4c5ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "1.0.36" +version = "1.1.4" authors = ["Block "] license = "Apache-2.0" repository = "https://github.com/block/goose" diff --git a/crates/goose-cli/src/commands/web.rs b/crates/goose-cli/src/commands/web.rs index 72b5d32bda9d..e61975ad1ada 100644 --- a/crates/goose-cli/src/commands/web.rs +++ b/crates/goose-cli/src/commands/web.rs @@ -475,7 +475,14 @@ async fn process_message_streaming( } let provider = provider.unwrap(); - session::persist_messages(&session_file, &messages, Some(provider.clone())).await?; + let working_dir = Some(std::env::current_dir()?); + session::persist_messages( + &session_file, + &messages, + Some(provider.clone()), + working_dir.clone(), + ) + .await?; // Create a session config let session_config = SessionConfig { @@ -503,7 +510,13 @@ async fn process_message_streaming( let session_msgs = session_messages.lock().await; session_msgs.clone() }; - session::persist_messages(&session_file, ¤t_messages, None).await?; + session::persist_messages( + &session_file, + ¤t_messages, + None, + working_dir.clone(), + ) + .await?; // Handle different message content types for content in &message.content { match content { diff --git a/crates/goose-cli/src/session/mod.rs b/crates/goose-cli/src/session/mod.rs index c6ccb09fe3b4..434135b48fae 100644 --- a/crates/goose-cli/src/session/mod.rs +++ b/crates/goose-cli/src/session/mod.rs @@ -364,11 +364,16 @@ impl Session { // Persist messages with provider for automatic description generation if let Some(session_file) = &self.session_file { + let working_dir = Some( + std::env::current_dir().expect("failed to get current session working directory"), + ); + session::persist_messages_with_schedule_id( session_file, &self.messages, Some(provider), self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -486,11 +491,17 @@ impl Session { // Persist messages with provider for automatic description generation if let Some(session_file) = &self.session_file { + let working_dir = Some( + std::env::current_dir() + .expect("failed to get current session working directory"), + ); + session::persist_messages_with_schedule_id( session_file, &self.messages, Some(provider), self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -702,11 +713,13 @@ impl Session { // Persist the summarized messages if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, Some(provider), self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -886,11 +899,13 @@ impl Session { )); push_message(&mut self.messages, response_message); if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, None, self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -985,11 +1000,13 @@ impl Session { // No need to update description on assistant messages if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, None, self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -1188,11 +1205,13 @@ impl Session { // No need for description update here if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, None, self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -1205,11 +1224,13 @@ impl Session { // No need for description update here if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, None, self.scheduled_job_id.clone(), + working_dir, ) .await?; } @@ -1227,11 +1248,13 @@ impl Session { // No need for description update here if let Some(session_file) = &self.session_file { + let working_dir = std::env::current_dir().ok(); session::persist_messages_with_schedule_id( session_file, &self.messages, None, self.scheduled_job_id.clone(), + working_dir, ) .await?; } diff --git a/crates/goose-server/src/routes/reply.rs b/crates/goose-server/src/routes/reply.rs index 55d04d955282..1f7f058814ab 100644 --- a/crates/goose-server/src/routes/reply.rs +++ b/crates/goose-server/src/routes/reply.rs @@ -123,7 +123,7 @@ async fn handler( let stream = ReceiverStream::new(rx); let messages = request.messages; - let session_working_dir = request.session_working_dir; + let session_working_dir = request.session_working_dir.clone(); let session_id = request .session_id @@ -181,7 +181,7 @@ async fn handler( &messages, Some(SessionConfig { id: session::Identifier::Name(session_id.clone()), - working_dir: PathBuf::from(session_working_dir), + working_dir: PathBuf::from(&session_working_dir), schedule_id: request.scheduled_job_id.clone(), execution_mode: None, max_turns: None, @@ -297,8 +297,13 @@ async fn handler( if all_messages.len() > saved_message_count { let provider = Arc::clone(provider.as_ref().unwrap()); tokio::spawn(async move { - if let Err(e) = - session::persist_messages(&session_path, &all_messages, Some(provider)).await + if let Err(e) = session::persist_messages( + &session_path, + &all_messages, + Some(provider), + Some(PathBuf::from(&session_working_dir)), + ) + .await { tracing::error!("Failed to store session history: {:?}", e); } @@ -337,7 +342,7 @@ async fn ask_handler( ) -> Result, StatusCode> { verify_secret_key(&headers, &state)?; - let session_working_dir = request.session_working_dir; + let session_working_dir = request.session_working_dir.clone(); let session_id = request .session_id @@ -358,7 +363,7 @@ async fn ask_handler( &messages, Some(SessionConfig { id: session::Identifier::Name(session_id.clone()), - working_dir: PathBuf::from(session_working_dir), + working_dir: PathBuf::from(&session_working_dir), schedule_id: request.scheduled_job_id.clone(), execution_mode: None, max_turns: None, @@ -420,9 +425,15 @@ async fn ask_handler( let session_path_clone = session_path.clone(); let messages = all_messages.clone(); let provider = Arc::clone(provider.as_ref().unwrap()); + let session_working_dir_clone = session_working_dir.clone(); tokio::spawn(async move { - if let Err(e) = - session::persist_messages(&session_path_clone, &messages, Some(provider)).await + if let Err(e) = session::persist_messages( + &session_path_clone, + &messages, + Some(provider), + Some(PathBuf::from(session_working_dir_clone)), + ) + .await { tracing::error!("Failed to store session history: {:?}", e); } diff --git a/crates/goose-server/src/routes/session.rs b/crates/goose-server/src/routes/session.rs index 8ed509e46f3d..cafc23d496b8 100644 --- a/crates/goose-server/src/routes/session.rs +++ b/crates/goose-server/src/routes/session.rs @@ -187,14 +187,20 @@ async fn get_session_insights( // Track tokens - only add positive values to prevent negative totals if let Some(tokens) = session.metadata.accumulated_total_tokens { - if tokens > 0 { - total_tokens += tokens as i64; - } else if tokens < 0 { - // Log negative token values for debugging - info!( - "Warning: Session {} has negative accumulated_total_tokens: {}", - session.id, tokens - ); + match tokens.cmp(&0) { + std::cmp::Ordering::Greater => { + total_tokens += tokens as i64; + } + std::cmp::Ordering::Less => { + // Log negative token values for debugging + info!( + "Warning: Session {} has negative accumulated_total_tokens: {}", + session.id, tokens + ); + } + std::cmp::Ordering::Equal => { + // Zero tokens, no action needed + } } } diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs index 928cf8bd0845..d1886e1b3504 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools.rs @@ -96,13 +96,32 @@ fn prepare_command_params( pub async fn create_sub_recipe_task(sub_recipe: &SubRecipe, params: Value) -> Result { let command_params = prepare_command_params(sub_recipe, params)?; + + // Extract task_timeout from sub_recipe values if present + let task_timeout = sub_recipe + .values + .as_ref() + .and_then(|values| values.get("task_timeout")) + .and_then(|timeout_str| timeout_str.parse::().ok()); + + let mut sub_recipe_data = json!({ + "name": sub_recipe.name.clone(), + "command_parameters": command_params, + "recipe_path": sub_recipe.path.clone(), + }); + + // Add task_timeout to the payload if present + if let Some(timeout_seconds) = task_timeout { + sub_recipe_data.as_object_mut().unwrap().insert( + "task_timeout".to_string(), + json!(timeout_seconds) + ); + } + let payload = json!({ - "sub_recipe": { - "name": sub_recipe.name.clone(), - "command_parameters": command_params, - "recipe_path": sub_recipe.path.clone(), - } + "sub_recipe": sub_recipe_data }); + let task = Task { id: uuid::Uuid::new_v4().to_string(), task_type: "sub_recipe".to_string(), diff --git a/crates/goose/src/agents/recipe_tools/sub_recipe_tools/tests.rs b/crates/goose/src/agents/recipe_tools/sub_recipe_tools/tests.rs index 11ce390a6b3b..cb2e3e971a10 100644 --- a/crates/goose/src/agents/recipe_tools/sub_recipe_tools/tests.rs +++ b/crates/goose/src/agents/recipe_tools/sub_recipe_tools/tests.rs @@ -152,4 +152,67 @@ mod tests { assert_eq!(result["required"][0], "key1"); } } + + mod create_sub_recipe_task_tests { + use super::*; + use crate::agents::recipe_tools::sub_recipe_tools::create_sub_recipe_task; + use crate::agents::sub_recipe_execution_tool::lib::Task; + use serde_json::json; + + #[tokio::test] + async fn test_create_sub_recipe_task_with_timeout() { + let mut sub_recipe = setup_sub_recipe(); + sub_recipe.values = Some(HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("task_timeout".to_string(), "3600".to_string()), + ])); + + let params = json!({ + "key2": "value2" + }); + + let result = create_sub_recipe_task(&sub_recipe, params).await.unwrap(); + let task: Task = serde_json::from_str(&result).unwrap(); + + assert_eq!(task.task_type, "sub_recipe"); + let sub_recipe_obj = task.payload.get("sub_recipe").unwrap(); + assert_eq!(sub_recipe_obj.get("name").unwrap(), "test_sub_recipe"); + assert_eq!(sub_recipe_obj.get("task_timeout").unwrap(), 3600); + + let command_params = sub_recipe_obj.get("command_parameters").unwrap(); + assert_eq!(command_params.get("key1").unwrap(), "value1"); + assert_eq!(command_params.get("key2").unwrap(), "value2"); + } + + #[tokio::test] + async fn test_create_sub_recipe_task_without_timeout() { + let sub_recipe = setup_sub_recipe(); + let params = json!({ + "key2": "value2" + }); + + let result = create_sub_recipe_task(&sub_recipe, params).await.unwrap(); + let task: Task = serde_json::from_str(&result).unwrap(); + + assert_eq!(task.task_type, "sub_recipe"); + let sub_recipe_obj = task.payload.get("sub_recipe").unwrap(); + assert!(sub_recipe_obj.get("task_timeout").is_none()); + } + + #[tokio::test] + async fn test_create_sub_recipe_task_invalid_timeout() { + let mut sub_recipe = setup_sub_recipe(); + sub_recipe.values = Some(HashMap::from([ + ("task_timeout".to_string(), "not_a_number".to_string()), + ])); + + let params = json!({}); + + let result = create_sub_recipe_task(&sub_recipe, params).await.unwrap(); + let task: Task = serde_json::from_str(&result).unwrap(); + + let sub_recipe_obj = task.payload.get("sub_recipe").unwrap(); + assert!(sub_recipe_obj.get("task_timeout").is_none()); + } + } } diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs b/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs index b796d412984d..91c29fee4a93 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/executor.rs @@ -9,9 +9,14 @@ use crate::agents::sub_recipe_execution_tool::lib::{ use crate::agents::sub_recipe_execution_tool::tasks::process_task; use crate::agents::sub_recipe_execution_tool::workers::{run_scaler, spawn_worker, SharedState}; +#[cfg(test)] +mod tests; + pub async fn execute_single_task(task: &Task, config: Config) -> ExecutionResponse { let start_time = Instant::now(); - let result = process_task(task, config.timeout_seconds).await; + // Use task-specific timeout if available, otherwise use default + let timeout_seconds = task.get_task_timeout().unwrap_or(config.timeout_seconds); + let result = process_task(task, timeout_seconds).await; let execution_time = start_time.elapsed().as_millis(); let completed = if result.status == "success" { 1 } else { 0 }; diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/executor/tests.rs b/crates/goose/src/agents/sub_recipe_execution_tool/executor/tests.rs new file mode 100644 index 000000000000..2ef14883b22e --- /dev/null +++ b/crates/goose/src/agents/sub_recipe_execution_tool/executor/tests.rs @@ -0,0 +1,58 @@ +#[cfg(test)] +mod tests { + use super::super::*; + use crate::agents::sub_recipe_execution_tool::types::Task; + use serde_json::json; + + #[tokio::test] + async fn test_execute_single_task_with_custom_timeout() { + // Create a task with custom timeout + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "task_timeout": 120, + "command_parameters": {}, + "recipe_path": "test.yaml" + } + }), + }; + + let config = Config { + timeout_seconds: 60, // Default timeout + max_workers: 10, + initial_workers: 2, + }; + + // The timeout extraction logic is tested here + // We verify that get_task_timeout returns the expected value + assert_eq!(task.get_task_timeout(), Some(120)); + } + + #[tokio::test] + async fn test_execute_single_task_default_timeout() { + // Create a task without custom timeout + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "command_parameters": {}, + "recipe_path": "test.yaml" + } + }), + }; + + let config = Config { + timeout_seconds: 60, // Default timeout + max_workers: 10, + initial_workers: 2, + }; + + // Should return None when no timeout is specified + assert_eq!(task.get_task_timeout(), None); + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs index 4e4584aa0b34..f26e842f233d 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/tasks.rs @@ -7,6 +7,9 @@ use tokio::time::timeout; use crate::agents::sub_recipe_execution_tool::types::{Task, TaskResult}; +#[cfg(test)] +mod tests; + // Process a single task based on its type pub async fn process_task(task: &Task, timeout_seconds: u64) -> TaskResult { let task_clone = task.clone(); diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/tasks/tests.rs b/crates/goose/src/agents/sub_recipe_execution_tool/tasks/tests.rs new file mode 100644 index 000000000000..854e8ad9b952 --- /dev/null +++ b/crates/goose/src/agents/sub_recipe_execution_tool/tasks/tests.rs @@ -0,0 +1,39 @@ +#[cfg(test)] +mod tests { + use super::super::*; + use crate::agents::sub_recipe_execution_tool::types::Task; + use serde_json::json; + + #[test] + fn test_task_timeout_extraction() { + // Test that the task timeout is properly extracted and used + let task_with_timeout = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "task_timeout": 180, + "command_parameters": {}, + "recipe_path": "test.yaml" + } + }), + }; + + assert_eq!(task_with_timeout.get_task_timeout(), Some(180)); + + let task_without_timeout = Task { + id: "test-id-2".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "command_parameters": {}, + "recipe_path": "test.yaml" + } + }), + }; + + assert_eq!(task_without_timeout.get_task_timeout(), None); + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/types.rs b/crates/goose/src/agents/sub_recipe_execution_tool/types.rs index ede71dbf40b4..2533c2cd4451 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/types.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/types.rs @@ -9,6 +9,29 @@ pub struct Task { pub payload: Value, } +impl Task { + /// Extract task-specific timeout from the task payload + pub fn get_task_timeout(&self) -> Option { + // For sub_recipe tasks, get timeout from sub_recipe object + if self.task_type == "sub_recipe" { + self.payload + .get("sub_recipe") + .and_then(|sr| sr.get("task_timeout")) + .and_then(|timeout| timeout.as_u64()) + } else { + // For text_instruction tasks, check if there's a sub_recipe field with timeout + self.payload + .get("sub_recipe") + .and_then(|sr| sr.as_object()) + .and_then(|sr| sr.get("task_timeout")) + .and_then(|timeout| timeout.as_u64()) + } + } +} + +#[cfg(test)] +mod tests; + // Result for each task #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TaskResult { diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/types/tests.rs b/crates/goose/src/agents/sub_recipe_execution_tool/types/tests.rs new file mode 100644 index 000000000000..70acbbfdb5a4 --- /dev/null +++ b/crates/goose/src/agents/sub_recipe_execution_tool/types/tests.rs @@ -0,0 +1,85 @@ +#[cfg(test)] +mod tests { + use super::super::*; + use serde_json::json; + + #[test] + fn test_get_task_timeout_sub_recipe() { + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "task_timeout": 3600 + } + }), + }; + + assert_eq!(task.get_task_timeout(), Some(3600)); + } + + #[test] + fn test_get_task_timeout_text_instruction() { + let task = Task { + id: "test-id".to_string(), + task_type: "text_instruction".to_string(), + payload: json!({ + "instruction": "test", + "sub_recipe": { + "task_timeout": 1800 + } + }), + }; + + assert_eq!(task.get_task_timeout(), Some(1800)); + } + + #[test] + fn test_get_task_timeout_no_timeout() { + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test" + } + }), + }; + + assert_eq!(task.get_task_timeout(), None); + } + + #[test] + fn test_get_task_timeout_invalid_type() { + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "task_timeout": "not_a_number" + } + }), + }; + + assert_eq!(task.get_task_timeout(), None); + } + + #[test] + fn test_get_task_timeout_negative_number() { + let task = Task { + id: "test-id".to_string(), + task_type: "sub_recipe".to_string(), + payload: json!({ + "sub_recipe": { + "name": "test", + "task_timeout": -100 + } + }), + }; + + // Negative numbers can't be converted to u64 + assert_eq!(task.get_task_timeout(), None); + } +} \ No newline at end of file diff --git a/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs index e48f19c4d360..3a98a7106ac4 100644 --- a/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs +++ b/crates/goose/src/agents/sub_recipe_execution_tool/workers.rs @@ -61,7 +61,7 @@ pub fn spawn_worker( }) } -async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds: u64) { +async fn worker_loop(state: Arc, _worker_id: usize, default_timeout_seconds: u64) { loop { // Try to receive a task let task = { @@ -71,6 +71,9 @@ async fn worker_loop(state: Arc, _worker_id: usize, timeout_seconds match task { Some(task) => { + // Use task-specific timeout if available, otherwise use default + let timeout_seconds = task.get_task_timeout().unwrap_or(default_timeout_seconds); + // Process the task let result = process_task(&task, timeout_seconds).await; diff --git a/crates/goose/src/providers/formats/openai.rs b/crates/goose/src/providers/formats/openai.rs index 7660afe12748..a8ca822ddbac 100644 --- a/crates/goose/src/providers/formats/openai.rs +++ b/crates/goose/src/providers/formats/openai.rs @@ -428,56 +428,83 @@ where if chunk.choices.is_empty() { yield (None, usage) } else if let Some(tool_calls) = &chunk.choices[0].delta.tool_calls { - let tool_call = &tool_calls[0]; - let id = tool_call.id.clone().ok_or(anyhow!("No tool call ID"))?; - let function_name = tool_call.function.name.clone().ok_or(anyhow!("No function name"))?; - let mut arguments = tool_call.function.arguments.clone(); - - while let Some(response_chunk) = stream.next().await { - if response_chunk.as_ref().is_ok_and(|s| s == "data: [DONE]") { - break 'outer; + let mut tool_call_data: std::collections::HashMap = std::collections::HashMap::new(); + + for tool_call in tool_calls { + if let (Some(index), Some(id), Some(name)) = (tool_call.index, &tool_call.id, &tool_call.function.name) { + tool_call_data.insert(index, (id.clone(), name.clone(), tool_call.function.arguments.clone())); } - let response_str = response_chunk?; - if let Some(line) = strip_data_prefix(&response_str) { - let tool_chunk: StreamingChunk = serde_json::from_str(line) - .map_err(|e| anyhow!("Failed to parse streaming chunk: {}: {:?}", e, &line))?; - let more_args = tool_chunk.choices[0].delta.tool_calls.as_ref() - .and_then(|calls| calls.first()) - .map(|call| call.function.arguments.as_str()); - if let Some(more_args) = more_args { - arguments.push_str(more_args); - } else { - break; + } + + let mut done = false; + while !done { + if let Some(response_chunk) = stream.next().await { + if response_chunk.as_ref().is_ok_and(|s| s == "data: [DONE]") { + break 'outer; } + let response_str = response_chunk?; + if let Some(line) = strip_data_prefix(&response_str) { + let tool_chunk: StreamingChunk = serde_json::from_str(line) + .map_err(|e| anyhow!("Failed to parse streaming chunk: {}: {:?}", e, &line))?; + + if let Some(delta_tool_calls) = &tool_chunk.choices[0].delta.tool_calls { + for delta_call in delta_tool_calls { + if let Some(index) = delta_call.index { + if let Some((_, _, ref mut args)) = tool_call_data.get_mut(&index) { + args.push_str(&delta_call.function.arguments); + } else if let (Some(id), Some(name)) = (&delta_call.id, &delta_call.function.name) { + tool_call_data.insert(index, (id.clone(), name.clone(), delta_call.function.arguments.clone())); + } + } + } + } else { + done = true; + } + + if tool_chunk.choices[0].finish_reason == Some("tool_calls".to_string()) { + done = true; + } + } + } else { + break; } } - let parsed = if arguments.is_empty() { - Ok(json!({})) - } else { - serde_json::from_str::(&arguments) - }; - - let content = match parsed { - Ok(params) => MessageContent::tool_request( - id, - Ok(ToolCall::new(function_name, params)), - ), - Err(e) => { - let error = ToolError::InvalidParameters(format!( - "Could not interpret tool use parameters for id {}: {}", - id, e - )); - MessageContent::tool_request(id, Err(error)) + let mut contents = Vec::new(); + let mut sorted_indices: Vec<_> = tool_call_data.keys().cloned().collect(); + sorted_indices.sort(); + + for index in sorted_indices { + if let Some((id, function_name, arguments)) = tool_call_data.get(&index) { + let parsed = if arguments.is_empty() { + Ok(json!({})) + } else { + serde_json::from_str::(arguments) + }; + + let content = match parsed { + Ok(params) => MessageContent::tool_request( + id.clone(), + Ok(ToolCall::new(function_name.clone(), params)), + ), + Err(e) => { + let error = ToolError::InvalidParameters(format!( + "Could not interpret tool use parameters for id {}: {}", + id, e + )); + MessageContent::tool_request(id.clone(), Err(error)) + } + }; + contents.push(content); } - }; + } yield ( Some(Message { id: chunk.id, role: Role::Assistant, created: chrono::Utc::now().timestamp(), - content: vec![content], + content: contents, }), usage, ) @@ -601,6 +628,8 @@ mod tests { use super::*; use mcp_core::content::Content; use serde_json::json; + use tokio::pin; + use tokio_stream::{self, StreamExt}; #[test] fn test_validate_tool_schemas() { @@ -1088,4 +1117,54 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_streamed_multi_tool_response_to_messages() -> anyhow::Result<()> { + let response_lines = r#" +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"I'll run both"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" `ls` commands in a"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" single turn for you -"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":" one on the current directory an"},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":"d one on the `working_dir`."},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288340} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"id":"toolu_bdrk_01RMTd7R9DzQjEEWgDwzcBsU","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"command\": \"l"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":1,"function":{"arguments":"s\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"id":"toolu_bdrk_016bgVTGZdpjP8ehjMWp9cWW","type":"function","function":{"name":"developer__shell","arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288341} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"{\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"command\""}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":": \"ls wor"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"king_dir"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":null,"tool_calls":[{"index":2,"function":{"arguments":"\"}"}}]},"index":0,"finish_reason":null}],"usage":{"prompt_tokens":4982,"completion_tokens":null,"total_tokens":null},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":{"role":"assistant","content":""},"index":0,"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":4982,"completion_tokens":122,"total_tokens":5104},"object":"chat.completion.chunk","id":"msg_bdrk_014pifLTHsNZz6Lmtw1ywgDJ","created":1753288342} +data: [DONE] +"#; + + let response_stream = + tokio_stream::iter(response_lines.lines().map(|line| Ok(line.to_string()))); + let messages = response_to_streaming_message(response_stream); + pin!(messages); + + while let Some(Ok((message, _usage))) = messages.next().await { + if let Some(msg) = message { + println!("{:?}", msg); + if msg.content.len() == 2 { + if let (MessageContent::ToolRequest(req1), MessageContent::ToolRequest(req2)) = + (&msg.content[0], &msg.content[1]) + { + if req1.tool_call.is_ok() && req2.tool_call.is_ok() { + // We expect two tool calls in the response + assert_eq!(req1.tool_call.as_ref().unwrap().name, "developer__shell"); + assert_eq!(req2.tool_call.as_ref().unwrap().name, "developer__shell"); + return Ok(()); + } + } + } + } + } + + panic!("Expected tool call message with two calls, but did not see it"); + } } diff --git a/crates/goose/src/session/info.rs b/crates/goose/src/session/info.rs index a772af05424e..6c60d3310dba 100644 --- a/crates/goose/src/session/info.rs +++ b/crates/goose/src/session/info.rs @@ -26,29 +26,56 @@ pub fn get_valid_sorted_sessions(sort_order: SortOrder) -> Result = sessions - .into_iter() - .filter_map(|(id, path)| { - let modified = path - .metadata() - .and_then(|m| m.modified()) - .map(|time| { - chrono::DateTime::::from(time) - .format("%Y-%m-%d %H:%M:%S UTC") - .to_string() - }) - .ok()?; - - let metadata = session::read_metadata(&path).ok()?; - - Some(SessionInfo { - id, - path: path.to_string_lossy().to_string(), - modified, - metadata, + + let mut session_infos: Vec = Vec::new(); + let mut corrupted_count = 0; + + for (id, path) in sessions { + // Get file modification time with fallback + let modified = path + .metadata() + .and_then(|m| m.modified()) + .map(|time| { + chrono::DateTime::::from(time) + .format("%Y-%m-%d %H:%M:%S UTC") + .to_string() }) - }) - .collect(); + .unwrap_or_else(|_| { + tracing::warn!("Failed to get modification time for session: {}", id); + "Unknown".to_string() + }); + + // Try to read metadata with error handling + match session::read_metadata(&path) { + Ok(metadata) => { + session_infos.push(SessionInfo { + id, + path: path.to_string_lossy().to_string(), + modified, + metadata, + }); + } + Err(e) => { + corrupted_count += 1; + tracing::warn!( + "Failed to read metadata for session '{}': {}. Skipping corrupted session.", + id, + e + ); + + // Optionally, we could create a placeholder entry for corrupted sessions + // to show them in the UI with an error indicator, but for now we skip them + continue; + } + } + } + + if corrupted_count > 0 { + tracing::warn!( + "Skipped {} corrupted sessions during listing", + corrupted_count + ); + } // Sort sessions by modified date // Since all dates are in ISO format (YYYY-MM-DD HH:MM:SS UTC), we can just use string comparison @@ -70,3 +97,42 @@ pub fn get_valid_sorted_sessions(sort_order: SortOrder) -> Result Result { /// /// Security features: /// - Validates file paths to prevent directory traversal -/// - Uses secure file operations via persist_messages_with_schedule_id pub async fn persist_messages( session_file: &Path, messages: &[Message], provider: Option>, + working_dir: Option, ) -> Result<()> { - persist_messages_with_schedule_id(session_file, messages, provider, None).await + persist_messages_with_schedule_id(session_file, messages, provider, None, working_dir).await } /// Write messages to a session file with metadata, including an optional scheduled job ID @@ -1065,6 +1065,7 @@ pub async fn persist_messages_with_schedule_id( messages: &[Message], provider: Option>, schedule_id: Option, + working_dir: Option, ) -> Result<()> { // Validate the session file path for security let secure_path = get_path(Identifier::Path(session_file.to_path_buf()))?; @@ -1085,16 +1086,35 @@ pub async fn persist_messages_with_schedule_id( match provider { Some(provider) if user_message_count < 4 => { //generate_description is responsible for writing the messages - generate_description_with_schedule_id(&secure_path, messages, provider, schedule_id) - .await + generate_description_with_schedule_id( + &secure_path, + messages, + provider, + schedule_id, + working_dir, + ) + .await } _ => { - // Read existing metadata - let mut metadata = read_metadata(&secure_path)?; + // Read existing metadata or create new with proper working_dir + let mut metadata = if secure_path.exists() { + read_metadata(&secure_path)? + } else { + // Create new metadata with the provided working_dir or fall back to home + let work_dir = working_dir.clone().unwrap_or_else(get_home_dir); + SessionMetadata::new(work_dir) + }; + + // Update the working_dir if provided (even for existing files) + if let Some(work_dir) = working_dir { + metadata.working_dir = work_dir; + } + // Update the schedule_id if provided if schedule_id.is_some() { metadata.schedule_id = schedule_id; } + // Write the file with metadata and messages save_messages_with_metadata(&secure_path, &metadata, messages) } @@ -1232,11 +1252,12 @@ pub async fn generate_description( session_file: &Path, messages: &[Message], provider: Arc, + working_dir: Option, ) -> Result<()> { - generate_description_with_schedule_id(session_file, messages, provider, None).await + generate_description_with_schedule_id(session_file, messages, provider, None, working_dir).await } -/// Generate a description for the session using the provider, including an optional scheduled job ID +/// Generate a description for the session using the provider, including an optional scheduled job ID and working directory /// /// This function is called when appropriate to generate a short description /// of the session based on the conversation history. @@ -1250,6 +1271,7 @@ pub async fn generate_description_with_schedule_id( messages: &[Message], provider: Arc, schedule_id: Option, + working_dir: Option, ) -> Result<()> { // Validate the path for security let secure_path = get_path(Identifier::Path(session_file.to_path_buf()))?; @@ -1311,7 +1333,14 @@ pub async fn generate_description_with_schedule_id( description }; - let mut metadata = read_metadata(&secure_path)?; + // Create metadata with proper working_dir or read existing and update + let mut metadata = if secure_path.exists() { + read_metadata(&secure_path)? + } else { + // Create new metadata with the provided working_dir or fall back to home + let work_dir = working_dir.clone().unwrap_or_else(get_home_dir); + SessionMetadata::new(work_dir) + }; // Update description and schedule_id metadata.description = sanitized_description; @@ -1319,6 +1348,11 @@ pub async fn generate_description_with_schedule_id( metadata.schedule_id = schedule_id; } + // Update the working_dir if provided (even for existing files) + if let Some(work_dir) = working_dir { + metadata.working_dir = work_dir; + } + // Update the file with the new metadata and existing messages save_messages_with_metadata(&secure_path, &metadata, messages) } @@ -1430,7 +1464,7 @@ mod tests { ]; // Write messages - persist_messages(&file_path, &messages, None).await?; + persist_messages(&file_path, &messages, None, None).await?; // Read them back let read_messages = read_messages(&file_path)?; @@ -1538,7 +1572,7 @@ mod tests { } // Write messages with special characters - persist_messages(&file_path, &messages, None).await?; + persist_messages(&file_path, &messages, None, None).await?; // Read them back let read_messages = read_messages(&file_path)?; @@ -1603,7 +1637,7 @@ mod tests { ]; // Write messages - persist_messages(&file_path, &messages, None).await?; + persist_messages(&file_path, &messages, None, None).await?; // Read them back - should be truncated let read_messages = read_messages(&file_path)?; @@ -1694,6 +1728,162 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_working_dir_preservation() -> Result<()> { + let dir = tempdir()?; + let file_path = dir.path().join("test.jsonl"); + + // Create a temporary working directory + let working_dir = tempdir()?; + let working_dir_path = working_dir.path().to_path_buf(); + + // Create messages + let messages = vec![Message::user().with_text("test message")]; + + // Use persist_messages_with_schedule_id to set working dir + persist_messages_with_schedule_id( + &file_path, + &messages, + None, + None, + Some(working_dir_path.clone()), + ) + .await?; + + // Read back the metadata and verify working_dir is preserved + let metadata = read_metadata(&file_path)?; + assert_eq!(metadata.working_dir, working_dir_path); + + // Verify the messages are also preserved + let read_messages = read_messages(&file_path)?; + assert_eq!(read_messages.len(), 1); + assert_eq!(read_messages[0].role, messages[0].role); + + Ok(()) + } + + #[tokio::test] + async fn test_working_dir_issue_fixed() -> Result<()> { + // This test demonstrates that the working_dir issue in jsonl files is fixed + let dir = tempdir()?; + let file_path = dir.path().join("test.jsonl"); + + // Create a temporary working directory (this simulates the actual working directory) + let working_dir = tempdir()?; + let working_dir_path = working_dir.path().to_path_buf(); + + // Create messages + let messages = vec![Message::user().with_text("test message")]; + + // Get the home directory for comparison + let home_dir = get_home_dir(); + + // Test 1: Using the old persist_messages function (without working_dir) + // This will fall back to home directory since no working_dir is provided + persist_messages(&file_path, &messages, None, None).await?; + + // Read back the metadata - this should now have the home directory as working_dir + let metadata_old = read_metadata(&file_path)?; + assert_eq!( + metadata_old.working_dir, home_dir, + "persist_messages should use home directory when no working_dir is provided" + ); + + // Test 2: Using persist_messages_with_schedule_id function + // This should properly set the working_dir (this is the main fix) + persist_messages_with_schedule_id( + &file_path, + &messages, + None, + None, + Some(working_dir_path.clone()), + ) + .await?; + + // Read back the metadata - this should now have the correct working_dir + let metadata_new = read_metadata(&file_path)?; + assert_eq!( + metadata_new.working_dir, working_dir_path, + "persist_messages_with_schedule_id should use provided working_dir" + ); + assert_ne!( + metadata_new.working_dir, home_dir, + "working_dir should be different from home directory" + ); + + // Test 3: Create a new session file without working_dir (should fall back to home) + let file_path_2 = dir.path().join("test2.jsonl"); + persist_messages_with_schedule_id( + &file_path_2, + &messages, + None, + None, + None, // No working_dir provided + ) + .await?; + + let metadata_fallback = read_metadata(&file_path_2)?; + assert_eq!(metadata_fallback.working_dir, home_dir, "persist_messages_with_schedule_id should fall back to home directory when no working_dir is provided"); + + // Test 4: Test that the fix works for existing files + // Create a session file and then add to it with different working_dir + let file_path_3 = dir.path().join("test3.jsonl"); + + // First, create with home directory + persist_messages(&file_path_3, &messages, None, None).await?; + let metadata_initial = read_metadata(&file_path_3)?; + assert_eq!( + metadata_initial.working_dir, home_dir, + "Initial session should use home directory" + ); + + // Then update with a specific working_dir + persist_messages_with_schedule_id( + &file_path_3, + &messages, + None, + None, + Some(working_dir_path.clone()), + ) + .await?; + + let metadata_updated = read_metadata(&file_path_3)?; + assert_eq!( + metadata_updated.working_dir, working_dir_path, + "Updated session should use new working_dir" + ); + + // Test 5: Most important test - simulate the real-world scenario where + // CLI and web interfaces pass the current directory instead of None + let file_path_4 = dir.path().join("test4.jsonl"); + let current_dir = std::env::current_dir()?; + + // This is what web.rs and session/mod.rs do now after the fix + persist_messages_with_schedule_id( + &file_path_4, + &messages, + None, + None, + Some(current_dir.clone()), + ) + .await?; + + let metadata_current = read_metadata(&file_path_4)?; + assert_eq!( + metadata_current.working_dir, current_dir, + "Session should use current directory when explicitly provided" + ); + // This should NOT be the home directory anymore (unless current_dir == home_dir) + if current_dir != home_dir { + assert_ne!( + metadata_current.working_dir, home_dir, + "working_dir should be different from home directory when current_dir is different" + ); + } + + Ok(()) + } + #[test] fn test_windows_path_validation() -> Result<()> { // Test the Windows path validation logic @@ -1781,12 +1971,13 @@ mod tests { Message::assistant().with_text("Test response"), ]; - // Test persist_messages_with_schedule_id with save_session = true + // Test persist_messages_with_schedule_id with working_dir parameter persist_messages_with_schedule_id( &file_path, &messages, None, Some("test_schedule".to_string()), + None, ) .await?; diff --git a/ui/desktop/index.html b/ui/desktop/index.html index 3f8c17512877..f606653e47eb 100644 --- a/ui/desktop/index.html +++ b/ui/desktop/index.html @@ -2,7 +2,7 @@ - + Goose