Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions crates/goose/src/providers/formats/openai_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,52 @@ pub enum ResponsesStreamEvent {
Error { error: Value },
}

fn is_known_responses_stream_event_type(event_type: &str) -> bool {
matches!(
event_type,
"response.created"
| "response.in_progress"
| "response.output_item.added"
| "response.content_part.added"
| "response.output_text.delta"
| "response.output_item.done"
| "response.content_part.done"
| "response.output_text.done"
| "response.completed"
| "response.failed"
| "response.function_call_arguments.delta"
| "response.function_call_arguments.done"
| "error"
)
}

fn parse_responses_stream_event(data_line: &str) -> anyhow::Result<Option<ResponsesStreamEvent>> {
let raw_event: Value = serde_json::from_str(data_line).map_err(|e| {
anyhow!(
"Failed to parse Responses stream event: {}: {:?}",
e,
data_line
)
})?;

let Some(event_type) = raw_event.get("type").and_then(Value::as_str) else {
return Ok(None);
};

if !is_known_responses_stream_event_type(event_type) {
return Ok(None);
}

let event = serde_json::from_value(raw_event).map_err(|e| {
anyhow!(
"Failed to parse Responses stream event: {}: {:?}",
e,
data_line
)
})?;
Ok(Some(event))
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ResponseMetadata {
pub id: String,
Expand Down Expand Up @@ -588,6 +634,9 @@ where
if response_str.trim().is_empty() {
continue;
}
if response_str.starts_with(':') {
continue;
}

// Parse SSE format: "event: <type>\ndata: <json>"
// For now, we only care about the data line
Expand All @@ -605,8 +654,9 @@ where
break 'outer;
}

let event: ResponsesStreamEvent = serde_json::from_str(data_line)
.map_err(|e| anyhow!("Failed to parse Responses stream event: {}: {:?}", e, data_line))?;
let Some(event) = parse_responses_stream_event(data_line)? else {
continue;
};

match event {
ResponsesStreamEvent::ResponseCreated { response, .. } |
Expand Down Expand Up @@ -702,3 +752,77 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::conversation::message::MessageContent;
use futures::StreamExt;

#[tokio::test]
async fn test_responses_stream_ignores_keepalive_event() -> anyhow::Result<()> {
let lines = vec![
r#"data: {"type":"response.created","sequence_number":1,"response":{"id":"resp_1","object":"response","created_at":1737368310,"status":"in_progress","model":"gpt-5.2-pro","output":[]}}"#.to_string(),
r#"data: {"type":"keepalive"}"#.to_string(),
r#"data: {"type":"response.output_text.delta","sequence_number":2,"item_id":"msg_1","output_index":0,"content_index":0,"delta":"Hello"}"#.to_string(),
r#"data: {"type":"response.output_text.delta","sequence_number":3,"item_id":"msg_1","output_index":0,"content_index":0,"delta":" world"}"#.to_string(),
r#"data: {"type":"response.completed","sequence_number":4,"response":{"id":"resp_1","object":"response","created_at":1737368310,"status":"completed","model":"gpt-5.2-pro","output":[],"usage":{"input_tokens":10,"output_tokens":4,"total_tokens":14}}}"#.to_string(),
"data: [DONE]".to_string(),
];

let response_stream = tokio_stream::iter(lines.into_iter().map(Ok));
let messages = responses_api_to_streaming_message(response_stream);
futures::pin_mut!(messages);

let mut text_parts = Vec::new();
let mut usage: Option<ProviderUsage> = None;

while let Some(item) = messages.next().await {
let (message, maybe_usage) = item?;
if let Some(msg) = message {
for content in msg.content {
if let MessageContent::Text(text) = content {
text_parts.push(text.text.clone());
}
}
}
if let Some(final_usage) = maybe_usage {
usage = Some(final_usage);
}
}

assert_eq!(text_parts.concat(), "Hello world");
let usage = usage.expect("usage should be present at completion");
assert_eq!(usage.model, "gpt-5.2-pro");
assert_eq!(usage.usage.input_tokens, Some(10));
assert_eq!(usage.usage.output_tokens, Some(4));
assert_eq!(usage.usage.total_tokens, Some(14));

Ok(())
}

#[tokio::test]
async fn test_responses_stream_error_event_still_returns_error() -> anyhow::Result<()> {
let lines = vec![
r#"data: {"type":"error","error":{"message":"boom"}}"#.to_string(),
"data: [DONE]".to_string(),
];

let response_stream = tokio_stream::iter(lines.into_iter().map(Ok));
let messages = responses_api_to_streaming_message(response_stream);
futures::pin_mut!(messages);

let first = messages
.next()
.await
.expect("stream should emit an error item");

assert!(first.is_err());
assert!(first
.expect_err("expected error")
.to_string()
.contains("Responses API error"));

Ok(())
}
}
31 changes: 28 additions & 3 deletions crates/goose/src/providers/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ impl OpenAiProvider {
normalized.ends_with("responses") || normalized.contains("/responses")
}

fn is_codex_gpt_5_model(model_name: &str) -> bool {
fn is_responses_model(model_name: &str) -> bool {
let normalized_model = model_name.to_ascii_lowercase();
normalized_model.starts_with("gpt-5") && normalized_model.contains("codex")
(normalized_model.starts_with("gpt-5") && normalized_model.contains("codex"))
|| normalized_model.starts_with("gpt-5.2-pro")
}

fn should_use_responses_api(model_name: &str, base_path: &str) -> bool {
Expand All @@ -246,7 +247,7 @@ impl OpenAiProvider {
}
}

Self::is_codex_gpt_5_model(model_name)
Self::is_responses_model(model_name)
}

fn map_base_path(base_path: &str, target: &str, fallback: &str) -> String {
Expand Down Expand Up @@ -624,6 +625,22 @@ mod tests {
));
}

#[test]
fn gpt_5_2_pro_uses_responses_when_base_path_is_default() {
assert!(OpenAiProvider::should_use_responses_api(
"gpt-5.2-pro",
"v1/chat/completions"
));
}

#[test]
fn gpt_5_2_pro_with_date_uses_responses() {
assert!(OpenAiProvider::should_use_responses_api(
"gpt-5.2-pro-2025-12-11",
"v1/chat/completions"
));
}

#[test]
fn explicit_chat_path_forces_chat_completions() {
assert!(!OpenAiProvider::should_use_responses_api(
Expand All @@ -632,6 +649,14 @@ mod tests {
));
}

#[test]
fn gpt_4o_does_not_use_responses() {
assert!(!OpenAiProvider::should_use_responses_api(
"gpt-4o",
"v1/chat/completions"
));
}

#[test]
fn custom_chat_path_maps_to_responses_path() {
let responses_path = OpenAiProvider::map_base_path(
Expand Down