Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 55 additions & 109 deletions crates/goose/src/providers/formats/google.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,38 +227,6 @@ enum SignedTextHandling {
SignedTextAsRegularText,
}

pub fn process_response_part(
part: &Value,
last_signature: &mut Option<String>,
) -> Option<MessageContent> {
let has_signature = part.get(THOUGHT_SIGNATURE_KEY).is_some();
let handling = if has_signature {
SignedTextHandling::SignedTextAsThinking
} else {
SignedTextHandling::SignedTextAsRegularText
};
process_response_part_impl(part, last_signature, handling)
}

/// Gemini 2.x includes thoughtSignature on first chunk as metadata, not actual thinking.
fn process_response_part_for_model(
part: &Value,
last_signature: &mut Option<String>,
model_version: Option<&str>,
) -> Option<MessageContent> {
let is_gemini_2 = model_version
.map(|m| m.starts_with("gemini-2"))
.unwrap_or(false);

let has_signature = part.get(THOUGHT_SIGNATURE_KEY).is_some();
let handling = if has_signature && !is_gemini_2 {
SignedTextHandling::SignedTextAsThinking
} else {
SignedTextHandling::SignedTextAsRegularText
};
process_response_part_impl(part, last_signature, handling)
}

fn process_response_part_non_streaming(
part: &Value,
last_signature: &mut Option<String>,
Expand Down Expand Up @@ -488,8 +456,6 @@ where
}
}

let model_version = chunk.get("modelVersion").and_then(|v| v.as_str());

let parts = chunk
.get("candidates")
.and_then(|v| v.as_array())
Expand All @@ -500,7 +466,9 @@ where

if let Some(parts) = parts {
for part in parts {
if let Some(content) = process_response_part_for_model(part, &mut last_signature, model_version) {
// Always emit text as regular text during streaming — we can't
// know yet whether function calls will follow.
if let Some(content) = process_response_part_impl(part, &mut last_signature, SignedTextHandling::SignedTextAsRegularText) {
let message = Message::new(
Role::Assistant,
chrono::Utc::now().timestamp(),
Expand Down Expand Up @@ -1192,90 +1160,68 @@ mod tests {
async fn test_streaming_with_thought_signature() {
use futures::StreamExt;

let gemini3_stream = concat!(
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "Begin", "thoughtSignature": "sig123"}]}}], "#,
r#""modelVersion": "gemini-3-pro"}"#,
"\n",
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": " end"}]}}], "modelVersion": "gemini-3-pro"}"#
);
let lines: Vec<Result<String, anyhow::Error>> =
gemini3_stream.lines().map(|l| Ok(l.to_string())).collect();
let stream = Box::pin(futures::stream::iter(lines));
let mut message_stream = std::pin::pin!(response_to_streaming_message(stream));

let mut text_parts = Vec::new();
let mut thinking_parts = Vec::new();

while let Some(result) = message_stream.next().await {
let (message, _usage) = result.unwrap();
if let Some(msg) = message {
match msg.content.first() {
Some(MessageContent::Text(text)) => text_parts.push(text.text.clone()),
Some(MessageContent::Thinking(t)) => thinking_parts.push(t.thinking.clone()),
_ => {}
async fn collect_streaming_text(raw: &str) -> (String, usize) {
let lines: Vec<Result<String, anyhow::Error>> =
raw.lines().map(|l| Ok(l.to_string())).collect();
let stream = Box::pin(futures::stream::iter(lines));
let mut msg_stream = std::pin::pin!(response_to_streaming_message(stream));
let mut text = String::new();
let mut thinking = 0usize;
while let Some(Ok((message, _))) = msg_stream.next().await {
if let Some(msg) = message {
for c in &msg.content {
match c {
MessageContent::Text(t) => text.push_str(&t.text),
MessageContent::Thinking(_) => thinking += 1,
_ => {}
}
}
}
}
(text, thinking)
}

assert_eq!(thinking_parts, vec!["Begin"]);
assert_eq!(text_parts, vec![" end"]);

let gemini25_stream = concat!(
// First chunk signed
let (text, thinking) = collect_streaming_text(concat!(
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "Begin", "thoughtSignature": "sig123"}]}}], "#,
r#""modelVersion": "gemini-2.5-pro"}"#,
r#""parts": [{"text": "Hello", "thoughtSignature": "sig1"}]}}], "#,
r#""modelVersion": "gemini-3-flash-preview"}"#,
"\n",
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": " end"}]}}], "modelVersion": "gemini-2.5-pro"}"#
);
let lines: Vec<Result<String, anyhow::Error>> =
gemini25_stream.lines().map(|l| Ok(l.to_string())).collect();
let stream = Box::pin(futures::stream::iter(lines));
let mut message_stream = std::pin::pin!(response_to_streaming_message(stream));

let mut text_parts = Vec::new();

while let Some(result) = message_stream.next().await {
let (message, _usage) = result.unwrap();
if let Some(msg) = message {
if let Some(MessageContent::Text(text)) = msg.content.first() {
text_parts.push(text.text.clone());
}
}
}

assert_eq!(text_parts, vec!["Begin", " end"]);

let unknown_stream = concat!(
r#""parts": [{"text": " world"}]}}], "modelVersion": "gemini-3-flash-preview"}"#
))
.await;
assert_eq!(thinking, 0);
assert_eq!(text, "Hello world");

// Last chunk signed (the reported truncation bug)
let (text, thinking) = collect_streaming_text(concat!(
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "Begin", "thoughtSignature": "sig123"}]}}]}"#,
r#""parts": [{"text": "SECURITY.md: Project"}]}}], "#,
r#""modelVersion": "gemini-3-flash-preview"}"#,
"\n",
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": " end"}]}}]}"#
);
let lines: Vec<Result<String, anyhow::Error>> =
unknown_stream.lines().map(|l| Ok(l.to_string())).collect();
let stream = Box::pin(futures::stream::iter(lines));
let mut message_stream = std::pin::pin!(response_to_streaming_message(stream));

let mut text_parts = Vec::new();
let mut thinking_parts = Vec::new();

while let Some(result) = message_stream.next().await {
let (message, _usage) = result.unwrap();
if let Some(msg) = message {
match msg.content.first() {
Some(MessageContent::Text(text)) => text_parts.push(text.text.clone()),
Some(MessageContent::Thinking(t)) => thinking_parts.push(t.thinking.clone()),
_ => {}
}
}
}

assert_eq!(thinking_parts, vec!["Begin"]);
assert_eq!(text_parts, vec![" end"]);
r#""parts": [{"text": " policies.\n\nRead it?", "thoughtSignature": "sig2"}]}}], "#,
r#""modelVersion": "gemini-3-flash-preview"}"#
))
.await;
assert_eq!(thinking, 0);
assert_eq!(text, "SECURITY.md: Project policies.\n\nRead it?");

// Intermediate chunk signed
let (text, thinking) = collect_streaming_text(concat!(
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "one "}]}}], "modelVersion": "gemini-3-flash-preview"}"#,
"\n",
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "two ", "thoughtSignature": "sig3"}]}}], "modelVersion": "gemini-3-flash-preview"}"#,
"\n",
r#"data: {"candidates": [{"content": {"role": "model", "#,
r#""parts": [{"text": "three"}]}}], "modelVersion": "gemini-3-flash-preview"}"#
))
.await;
assert_eq!(thinking, 0);
assert_eq!(text, "one two three");
}

#[tokio::test]
Expand Down