Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 154 additions & 30 deletions crates/goose/src/providers/formats/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,19 @@ pub fn get_usage(usage: &Value) -> Usage {
Usage::new(input_tokens, output_tokens, total_tokens)
}

fn extract_usage_with_output_tokens(chunk: &StreamingChunk) -> Option<ProviderUsage> {
chunk
.usage
.as_ref()
.and_then(|u| {
chunk.model.as_ref().map(|model| ProviderUsage {
usage: get_usage(u),
model: model.clone(),
})
})
.filter(|u| u.usage.output_tokens.is_some())
}

/// Validates and fixes tool schemas to ensure they have proper parameter structure.
/// If parameters exist, ensures they have properties and required fields, or removes parameters entirely.
pub fn validate_tool_schemas(tools: &mut [Value]) {
Expand Down Expand Up @@ -475,14 +488,7 @@ where
}
}

let usage = chunk.usage.as_ref().and_then(|u| {
chunk.model.as_ref().map(|model| {
ProviderUsage {
usage: get_usage(u),
model: model.clone(),
}
})
});
let mut usage = extract_usage_with_output_tokens(&chunk);

if chunk.choices.is_empty() {
yield (None, usage)
Expand All @@ -508,9 +514,14 @@ where
}
let response_str = response_chunk?;
if let Some(line) = strip_data_prefix(&response_str) {

let tool_chunk: StreamingChunk = serde_json::from_str(line)
.map_err(|e| anyhow!("Failed to parse streaming chunk: {}: {:?}", e, &line))?;

if let Some(chunk_usage) = extract_usage_with_output_tokens(&tool_chunk) {
usage = Some(chunk_usage);
}

if !tool_chunk.choices.is_empty() {
if let Some(details) = &tool_chunk.choices[0].delta.reasoning_details {
accumulated_reasoning.extend(details.iter().cloned());
Expand Down Expand Up @@ -1406,6 +1417,69 @@ mod tests {
Ok(())
}

struct StreamingUsageTestResult {
usage_count: usize,
usage: Option<ProviderUsage>,
tool_calls: Vec<String>,
has_text_content: bool,
}

async fn run_streaming_test(response_lines: &str) -> anyhow::Result<StreamingUsageTestResult> {
let lines: Vec<String> = response_lines.lines().map(|s| s.to_string()).collect();
let response_stream = tokio_stream::iter(lines.into_iter().map(Ok));
let messages = response_to_streaming_message(response_stream);
pin!(messages);

let mut result = StreamingUsageTestResult {
usage_count: 0,
usage: None,
tool_calls: Vec::new(),
has_text_content: false,
};

while let Some(Ok((message, usage))) = messages.next().await {
if let Some(u) = usage {
result.usage_count += 1;
result.usage = Some(u);
}
if let Some(msg) = message {
for content in &msg.content {
match content {
MessageContent::ToolRequest(req) => {
if let Ok(tool_call) = &req.tool_call {
result.tool_calls.push(tool_call.name.to_string());
}
}
MessageContent::Text(text) if !text.text.is_empty() => {
result.has_text_content = true;
}
_ => {}
}
}
}
}

Ok(result)
}

fn assert_usage_yielded_once(
result: &StreamingUsageTestResult,
expected_input: i32,
expected_output: i32,
expected_total: i32,
) {
assert_eq!(
result.usage_count, 1,
"Usage should be yielded exactly once, but was yielded {} times",
result.usage_count
);

let usage = result.usage.as_ref().expect("Expected usage to be present");
assert_eq!(usage.usage.input_tokens, Some(expected_input));
assert_eq!(usage.usage.output_tokens, Some(expected_output));
assert_eq!(usage.usage.total_tokens, Some(expected_total));
}

#[tokio::test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the details since this just feels like a large dump, but not sure we need three tests here to verify that we are now doing the right thing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug is caused by different structures of the streaming chunks in api response. So I added the tests with the test data with variations for different providers. This will provide harness when we changing the logic in the future.

async fn test_streamed_multi_tool_response_to_messages() -> anyhow::Result<()> {
let response_lines = r#"
Expand All @@ -1430,29 +1504,79 @@ data: {"model":"us.anthropic.claude-sonnet-4-20250514-v1:0","choices":[{"delta":
data: [DONE]
"#;

let response_stream =
tokio_stream::iter(response_lines.lines().map(|line| Ok(line.to_string())));
let messages = response_to_streaming_message(response_stream);
pin!(messages);
let result = run_streaming_test(response_lines).await?;
assert_eq!(
result.tool_calls.len(),
2,
"Expected 2 tool calls, got {}",
result.tool_calls.len()
);
assert!(result
.tool_calls
.iter()
.all(|name| name == "developer__shell"));

while let Some(Ok((message, _usage))) = messages.next().await {
if let Some(msg) = message {
println!("{:?}", msg);
if msg.content.len() == 2 {
if let (MessageContent::ToolRequest(req1), MessageContent::ToolRequest(req2)) =
(&msg.content[0], &msg.content[1])
{
if req1.tool_call.is_ok() && req2.tool_call.is_ok() {
// We expect two tool calls in the response
assert_eq!(req1.tool_call.as_ref().unwrap().name, "developer__shell");
assert_eq!(req2.tool_call.as_ref().unwrap().name, "developer__shell");
return Ok(());
}
}
}
}
}
assert_usage_yielded_once(&result, 4982, 122, 5104);

Ok(())
}

#[tokio::test]
async fn test_openrouter_streaming_usage_yielded_once() -> anyhow::Result<()> {
let response_lines = r#"
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":"","reasoning":null,"reasoning_details":[]},"finish_reason":null,"native_finish_reason":null,"logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":"There","reasoning":"","reasoning_details":[]},"finish_reason":null,"native_finish_reason":null,"logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":" are","reasoning":null,"reasoning_details":[]},"finish_reason":null,"native_finish_reason":null,"logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":" **47**","reasoning":null,"reasoning_details":[]},"finish_reason":null,"native_finish_reason":null,"logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":" files.","reasoning":null,"reasoning_details":[]},"finish_reason":null,"native_finish_reason":null,"logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":"","reasoning":null,"reasoning_details":[]},"finish_reason":"stop","native_finish_reason":"stop","logprobs":null}]}
data: {"id":"gen-1768896871-9HgAQqS1Z72C6gApaidi","provider":"OpenInference","model":"openai/gpt-oss-120b:free","object":"chat.completion.chunk","created":1768896871,"choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null,"native_finish_reason":null,"logprobs":null}],"usage":{"prompt_tokens":7007,"completion_tokens":49,"total_tokens":7056}}
data: [DONE]
"#;

let result = run_streaming_test(response_lines).await?;

assert!(result.has_text_content, "Expected text content in response");
assert_usage_yielded_once(&result, 7007, 49, 7056);

Ok(())
}

panic!("Expected tool call message with two calls, but did not see it");
#[tokio::test]
async fn test_openai_gpt5_streaming_usage_yielded_once() -> anyhow::Result<()> {
let response_lines = r#"
data: {"id":"chatcmpl-Bk9Ye6Y0t9E7bC3DOMxCpW8eJkTKU","object":"chat.completion.chunk","created":1737368310,"model":"gpt-5.2-1106-preview","service_tier":"default","system_fingerprint":"fp_5f325d54e6","choices":[{"index":0,"delta":{"role":"assistant","content":null,"tool_calls":[{"index":0,"id":"call_x4CIvBVfQhYMhyO0T1VEddua","type":"function","function":{"name":"developer__shell","arguments":""}}],"refusal":null},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-Bk9Ye6Y0t9E7bC3DOMxCpW8eJkTKU","object":"chat.completion.chunk","created":1737368310,"model":"gpt-5.2-1106-preview","service_tier":"default","system_fingerprint":"fp_5f325d54e6","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"command\":\"ls ~/Desktop | wc -l\"}"}}]},"logprobs":null,"finish_reason":null}],"usage":null}
data: {"id":"chatcmpl-Bk9Ye6Y0t9E7bC3DOMxCpW8eJkTKU","object":"chat.completion.chunk","created":1737368310,"model":"gpt-5.2-1106-preview","service_tier":"default","system_fingerprint":"fp_5f325d54e6","choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"tool_calls"}],"usage":null}
data: {"id":"chatcmpl-Bk9Ye6Y0t9E7bC3DOMxCpW8eJkTKU","object":"chat.completion.chunk","created":1737368310,"model":"gpt-5.2-1106-preview","service_tier":"default","system_fingerprint":"fp_5f325d54e6","choices":[],"usage":{"prompt_tokens":8320,"completion_tokens":172,"total_tokens":8492}}
data: [DONE]
"#;

let result = run_streaming_test(response_lines).await?;

assert_eq!(result.tool_calls.len(), 1, "Expected 1 tool call");
assert_eq!(result.tool_calls[0], "developer__shell");
assert_usage_yielded_once(&result, 8320, 172, 8492);

Ok(())
}

#[tokio::test]
async fn test_tetrate_claude_streaming_usage_yielded_once() -> anyhow::Result<()> {
let response_lines = r#"
data: {"id":"msg_01BbvMfNhbdm2hmmTbWjaeYt","choices":[{"index":0,"delta":{"role":"assistant"}}],"created":1768898776,"model":"claude-sonnet-4-5-20250929","object":"chat.completion.chunk"}
data: {"id":"msg_01BbvMfNhbdm2hmmTbWjaeYt","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"toolu_011Yj5pGczhs1597iLXp5XJK","type":"function","function":{"name":"developer__shell","arguments":""}}]}}],"created":1768898776,"model":"claude-sonnet-4-5-20250929","object":"chat.completion.chunk"}
data: {"id":"msg_01BbvMfNhbdm2hmmTbWjaeYt","choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"type":"function","function":{"arguments":"{\"command\": \"find ~/Desktop -type f | wc -l\"}"}}]}}],"created":1768898776,"model":"claude-sonnet-4-5-20250929","object":"chat.completion.chunk"}
data: {"id":"msg_01BbvMfNhbdm2hmmTbWjaeYt","choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"created":1768898776,"model":"claude-sonnet-4-5-20250929","object":"chat.completion.chunk","usage":{"completion_tokens":79,"prompt_tokens":12376,"total_tokens":12455}}
data: [DONE]
"#;

let result = run_streaming_test(response_lines).await?;

assert_eq!(result.tool_calls.len(), 1, "Expected 1 tool call");
assert_eq!(result.tool_calls[0], "developer__shell");
assert_usage_yielded_once(&result, 12376, 79, 12455);

Ok(())
}
}
Loading