From 7e576c5d1a308f46df7030dc0208b6746ba287c5 Mon Sep 17 00:00:00 2001 From: rabi Date: Wed, 31 Dec 2025 13:35:58 +0530 Subject: [PATCH] fix: make tool_call arguments optional and fix silent stream termination Some LLM providers (e.g., vLLM with QwQ models) send tool_call chunks where the first chunk contains only the function name without arguments, and subsequent chunks contain the arguments. This caused deserialization to fail with 'missing field arguments'. The deserialization error was silently treated as end-of-stream because 'while let Some(Ok(...))' doesn't match 'Some(Err(...))' - it just exits the loop as if the stream had completed normally. Changes: - Added #[serde(default)] to DeltaToolCallFunction.arguments in openai.rs to handle missing arguments by defaulting to empty string - Fixed stream consumption pattern: changed 'while let Some(Ok(...))' to 'while let Some(result)' with explicit error propagation via '?' - Added test for stream error propagation Signed-off-by: rabi --- crates/goose/src/agents/reply_parts.rs | 44 +++++++++++++++++++- crates/goose/src/providers/formats/openai.rs | 5 ++- 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/crates/goose/src/agents/reply_parts.rs b/crates/goose/src/agents/reply_parts.rs index cd00a07d07c4..bb9e47f01a10 100644 --- a/crates/goose/src/agents/reply_parts.rs +++ b/crates/goose/src/agents/reply_parts.rs @@ -235,7 +235,9 @@ impl Agent { }; Ok(Box::pin(try_stream! { - while let Some(Ok((mut message, usage))) = stream.next().await { + while let Some(result) = stream.next().await { + let (mut message, usage) = result?; + // Store the model information in the global store if let Some(usage) = usage.as_ref() { crate::providers::base::set_current_model(&usage.model); @@ -483,4 +485,44 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_stream_error_propagation() { + use futures::StreamExt; + + type StreamItem = Result<(Option, Option), ProviderError>; + let stream = futures::stream::iter(vec![ + Ok((Some(Message::assistant().with_text("chunk1")), None)), + Ok((Some(Message::assistant().with_text("chunk2")), None)), + Err(ProviderError::RequestFailed( + "simulated stream error".to_string(), + )), + ] as Vec); + + let mut pinned = Box::pin(stream); + let mut results = Vec::new(); + let mut error_seen = false; + + while let Some(result) = pinned.next().await { + match result { + Ok((message, _usage)) => { + if let Some(msg) = message { + results.push(msg.as_concat_text()); + } + } + Err(_e) => { + error_seen = true; + break; + } + } + } + + assert_eq!(results.len(), 2); + assert_eq!(results[0], "chunk1"); + assert_eq!(results[1], "chunk2"); + assert!( + error_seen, + "Error should have been propagated, not silently ignored" + ); + } } diff --git a/crates/goose/src/providers/formats/openai.rs b/crates/goose/src/providers/formats/openai.rs index 937d9658ffde..1db03270fedf 100644 --- a/crates/goose/src/providers/formats/openai.rs +++ b/crates/goose/src/providers/formats/openai.rs @@ -18,10 +18,11 @@ use serde_json::{json, Value}; use std::borrow::Cow; use std::ops::Deref; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Default)] struct DeltaToolCallFunction { name: Option, - arguments: String, // chunk of encoded JSON, + #[serde(default)] + arguments: String, } #[derive(Serialize, Deserialize, Debug)]