diff --git a/crates/goose/src/providers/formats/mod.rs b/crates/goose/src/providers/formats/mod.rs index 6f3df3d0bf28..e0a88288a49e 100644 --- a/crates/goose/src/providers/formats/mod.rs +++ b/crates/goose/src/providers/formats/mod.rs @@ -4,4 +4,5 @@ pub mod databricks; pub mod gcpvertexai; pub mod google; pub mod openai; +pub mod openai_responses; pub mod snowflake; diff --git a/crates/goose/src/providers/formats/openai_responses.rs b/crates/goose/src/providers/formats/openai_responses.rs new file mode 100644 index 000000000000..d8c237d4ab8b --- /dev/null +++ b/crates/goose/src/providers/formats/openai_responses.rs @@ -0,0 +1,696 @@ +use crate::conversation::message::{Message, MessageContent}; +use crate::model::ModelConfig; +use crate::providers::base::{ProviderUsage, Usage}; +use anyhow::{anyhow, Error}; +use async_stream::try_stream; +use chrono; +use futures::Stream; +use rmcp::model::{object, CallToolRequestParam, RawContent, Role, Tool}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::ops::Deref; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResponsesApiResponse { + pub id: String, + pub object: String, + pub created_at: i64, + pub status: String, + pub model: String, + pub output: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub reasoning: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub usage: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum ResponseOutputItem { + Reasoning { + id: String, + #[serde(skip_serializing_if = "Option::is_none")] + summary: Option>, + }, + Message { + id: String, + status: String, + role: String, + content: Vec, + }, + FunctionCall { + id: String, + status: String, + #[serde(skip_serializing_if = "Option::is_none")] + call_id: Option, + name: String, + arguments: String, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum ResponseContentBlock { + OutputText { + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + annotations: Option>, + }, + ToolCall { + id: String, + name: String, + input: Value, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResponseReasoningInfo { + pub effort: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub summary: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResponseUsage { + pub input_tokens: i32, + pub output_tokens: i32, + pub total_tokens: i32, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum ResponsesStreamEvent { + #[serde(rename = "response.created")] + ResponseCreated { + sequence_number: i32, + response: ResponseMetadata, + }, + #[serde(rename = "response.in_progress")] + ResponseInProgress { + sequence_number: i32, + response: ResponseMetadata, + }, + #[serde(rename = "response.output_item.added")] + OutputItemAdded { + sequence_number: i32, + output_index: i32, + item: ResponseOutputItemInfo, + }, + #[serde(rename = "response.content_part.added")] + ContentPartAdded { + sequence_number: i32, + item_id: String, + output_index: i32, + content_index: i32, + part: ContentPart, + }, + #[serde(rename = "response.output_text.delta")] + OutputTextDelta { + sequence_number: i32, + item_id: String, + output_index: i32, + content_index: i32, + delta: String, + #[serde(skip_serializing_if = "Option::is_none")] + logprobs: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + obfuscation: Option, + }, + #[serde(rename = "response.output_item.done")] + OutputItemDone { + sequence_number: i32, + output_index: i32, + item: ResponseOutputItemInfo, + }, + #[serde(rename = "response.content_part.done")] + ContentPartDone { + sequence_number: i32, + item_id: String, + output_index: i32, + content_index: i32, + part: ContentPart, + }, + #[serde(rename = "response.output_text.done")] + OutputTextDone { + sequence_number: i32, + item_id: String, + output_index: i32, + content_index: i32, + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + logprobs: Option>, + }, + #[serde(rename = "response.completed")] + ResponseCompleted { + sequence_number: i32, + response: ResponseMetadata, + }, + #[serde(rename = "response.failed")] + ResponseFailed { sequence_number: i32, error: Value }, + #[serde(rename = "response.function_call_arguments.delta")] + FunctionCallArgumentsDelta { + sequence_number: i32, + item_id: String, + output_index: i32, + delta: String, + #[serde(skip_serializing_if = "Option::is_none")] + obfuscation: Option, + }, + #[serde(rename = "response.function_call_arguments.done")] + FunctionCallArgumentsDone { + sequence_number: i32, + item_id: String, + output_index: i32, + arguments: String, + }, + #[serde(rename = "error")] + Error { error: Value }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ResponseMetadata { + pub id: String, + pub object: String, + pub created_at: i64, + pub status: String, + pub model: String, + pub output: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub usage: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub reasoning: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum ResponseOutputItemInfo { + Reasoning { + id: String, + summary: Vec, + }, + Message { + id: String, + status: String, + role: String, + content: Vec, + }, + FunctionCall { + id: String, + status: String, + call_id: String, + name: String, + arguments: String, + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +pub enum ContentPart { + OutputText { + text: String, + #[serde(skip_serializing_if = "Option::is_none")] + annotations: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + logprobs: Option>, + }, + ToolCall { + id: String, + name: String, + arguments: String, + }, +} + +fn add_conversation_history(input_items: &mut Vec, messages: &[Message]) { + for message in messages.iter().filter(|m| m.is_agent_visible()) { + let has_only_tool_content = message.content.iter().all(|c| { + matches!( + c, + MessageContent::ToolRequest(_) | MessageContent::ToolResponse(_) + ) + }); + + if has_only_tool_content { + continue; + } + + if message.role != Role::User && message.role != Role::Assistant { + continue; + } + + let role = match message.role { + Role::User => "user", + Role::Assistant => "assistant", + }; + + let mut content_items = Vec::new(); + for content in &message.content { + if let MessageContent::Text(text) = content { + if !text.text.is_empty() { + let content_type = if message.role == Role::Assistant { + "output_text" + } else { + "input_text" + }; + content_items.push(json!({ + "type": content_type, + "text": text.text + })); + } + } + } + + if !content_items.is_empty() { + input_items.push(json!({ + "role": role, + "content": content_items + })); + } + } +} + +fn add_function_calls(input_items: &mut Vec, messages: &[Message]) { + for message in messages.iter().filter(|m| m.is_agent_visible()) { + if message.role == Role::Assistant { + for content in &message.content { + if let MessageContent::ToolRequest(request) = content { + if let Ok(tool_call) = &request.tool_call { + let arguments_str = tool_call + .arguments + .as_ref() + .map(|args| { + serde_json::to_string(args).unwrap_or_else(|_| "{}".to_string()) + }) + .unwrap_or_else(|| "{}".to_string()); + + tracing::debug!( + "Replaying function_call with call_id: {}, name: {}", + request.id, + tool_call.name + ); + input_items.push(json!({ + "type": "function_call", + "call_id": request.id, + "name": tool_call.name, + "arguments": arguments_str + })); + } + } + } + } + } +} + +fn add_function_call_outputs(input_items: &mut Vec, messages: &[Message]) { + for message in messages.iter().filter(|m| m.is_agent_visible()) { + for content in &message.content { + if let MessageContent::ToolResponse(response) = content { + match &response.tool_result { + Ok(contents) => { + let text_content: Vec = contents + .content + .iter() + .filter_map(|c| { + if let RawContent::Text(t) = c.deref() { + Some(t.text.clone()) + } else { + None + } + }) + .collect(); + + if !text_content.is_empty() { + tracing::debug!( + "Sending function_call_output with call_id: {}", + response.id + ); + input_items.push(json!({ + "type": "function_call_output", + "call_id": response.id, + "output": text_content.join("\n") + })); + } + } + Err(error_data) => { + // Handle error responses - must send them back to the API + // to avoid "No tool output found" errors + tracing::debug!( + "Sending function_call_output error with call_id: {}", + response.id + ); + input_items.push(json!({ + "type": "function_call_output", + "call_id": response.id, + "output": format!("Error: {}", error_data.message) + })); + } + } + } + } + } +} + +pub fn create_responses_request( + model_config: &ModelConfig, + system: &str, + messages: &[Message], + tools: &[Tool], +) -> anyhow::Result { + let mut input_items = Vec::new(); + + if !system.is_empty() { + input_items.push(json!({ + "role": "system", + "content": [{ + "type": "input_text", + "text": system + }] + })); + } + + add_conversation_history(&mut input_items, messages); + add_function_calls(&mut input_items, messages); + add_function_call_outputs(&mut input_items, messages); + + let mut payload = json!({ + "model": model_config.model_name, + "input": input_items, + "store": false, // Don't store responses on server (we replay history ourselves) + }); + + if !tools.is_empty() { + let tools_spec: Vec = tools + .iter() + .map(|tool| { + json!({ + "type": "function", + "name": tool.name, + "description": tool.description, + "parameters": tool.input_schema, + }) + }) + .collect(); + + payload + .as_object_mut() + .unwrap() + .insert("tools".to_string(), json!(tools_spec)); + } + + if let Some(temp) = model_config.temperature { + payload + .as_object_mut() + .unwrap() + .insert("temperature".to_string(), json!(temp)); + } + + if let Some(tokens) = model_config.max_tokens { + payload + .as_object_mut() + .unwrap() + .insert("max_output_tokens".to_string(), json!(tokens)); + } + + Ok(payload) +} + +pub fn responses_api_to_message(response: &ResponsesApiResponse) -> anyhow::Result { + let mut content = Vec::new(); + + for item in &response.output { + match item { + ResponseOutputItem::Reasoning { .. } => { + continue; + } + ResponseOutputItem::Message { + content: msg_content, + .. + } => { + for block in msg_content { + match block { + ResponseContentBlock::OutputText { text, .. } => { + if !text.is_empty() { + content.push(MessageContent::text(text)); + } + } + ResponseContentBlock::ToolCall { id, name, input } => { + content.push(MessageContent::tool_request( + id.clone(), + Ok(CallToolRequestParam { + name: name.clone().into(), + arguments: Some(object(input.clone())), + }), + )); + } + } + } + } + ResponseOutputItem::FunctionCall { + id, + name, + arguments, + .. + } => { + tracing::debug!("Received FunctionCall with id: {}, name: {}", id, name); + let parsed_args = if arguments.is_empty() { + json!({}) + } else { + serde_json::from_str(arguments).unwrap_or_else(|_| json!({})) + }; + + content.push(MessageContent::tool_request( + id.clone(), + Ok(CallToolRequestParam { + name: name.clone().into(), + arguments: Some(object(parsed_args)), + }), + )); + } + } + } + + let mut message = Message::new(Role::Assistant, chrono::Utc::now().timestamp(), content); + + message = message.with_id(response.id.clone()); + + Ok(message) +} + +pub fn get_responses_usage(response: &ResponsesApiResponse) -> Usage { + response.usage.as_ref().map_or_else(Usage::default, |u| { + Usage::new( + Some(u.input_tokens), + Some(u.output_tokens), + Some(u.total_tokens), + ) + }) +} + +fn process_streaming_output_items( + output_items: Vec, + is_text_response: bool, +) -> Vec { + let mut content = Vec::new(); + + for item in output_items { + match item { + ResponseOutputItemInfo::Reasoning { .. } => { + // Skip reasoning items + } + ResponseOutputItemInfo::Message { content: parts, .. } => { + for part in parts { + match part { + ContentPart::OutputText { text, .. } => { + if !text.is_empty() && !is_text_response { + content.push(MessageContent::text(&text)); + } + } + ContentPart::ToolCall { + id, + name, + arguments, + } => { + let parsed_args = if arguments.is_empty() { + json!({}) + } else { + serde_json::from_str(&arguments).unwrap_or_else(|_| json!({})) + }; + + content.push(MessageContent::tool_request( + id, + Ok(CallToolRequestParam { + name: name.into(), + arguments: Some(object(parsed_args)), + }), + )); + } + } + } + } + ResponseOutputItemInfo::FunctionCall { + call_id, + name, + arguments, + .. + } => { + let parsed_args = if arguments.is_empty() { + json!({}) + } else { + serde_json::from_str(&arguments).unwrap_or_else(|_| json!({})) + }; + + content.push(MessageContent::tool_request( + call_id, + Ok(CallToolRequestParam { + name: name.into(), + arguments: Some(object(parsed_args)), + }), + )); + } + } + } + + content +} + +pub fn responses_api_to_streaming_message( + mut stream: S, +) -> impl Stream, Option)>> + 'static +where + S: Stream> + Unpin + Send + 'static, +{ + try_stream! { + use futures::StreamExt; + + let mut accumulated_text = String::new(); + let mut response_id: Option = None; + let mut model_name: Option = None; + let mut final_usage: Option = None; + let mut output_items: Vec = Vec::new(); + let mut is_text_response = false; + + 'outer: while let Some(response) = stream.next().await { + let response_str = response?; + + // Skip empty lines + if response_str.trim().is_empty() { + continue; + } + + // Parse SSE format: "event: \ndata: " + // For now, we only care about the data line + let data_line = if response_str.starts_with("data: ") { + response_str.strip_prefix("data: ").unwrap() + } else if response_str.starts_with("event: ") { + // Skip event type lines + continue; + } else { + // Try to parse as-is in case there's no prefix + &response_str + }; + + if data_line == "[DONE]" { + break 'outer; + } + + let event: ResponsesStreamEvent = serde_json::from_str(data_line) + .map_err(|e| anyhow!("Failed to parse Responses stream event: {}: {:?}", e, data_line))?; + + match event { + ResponsesStreamEvent::ResponseCreated { response, .. } | + ResponsesStreamEvent::ResponseInProgress { response, .. } => { + response_id = Some(response.id); + model_name = Some(response.model); + } + + ResponsesStreamEvent::OutputTextDelta { delta, .. } => { + is_text_response = true; + accumulated_text.push_str(&delta); + + // Yield incremental text updates for true streaming + let mut content = Vec::new(); + if !delta.is_empty() { + content.push(MessageContent::text(&delta)); + } + let mut msg = Message::new(Role::Assistant, chrono::Utc::now().timestamp(), content); + + // Add ID so desktop client knows these deltas are part of the same message + if let Some(id) = &response_id { + msg = msg.with_id(id.clone()); + } + + yield (Some(msg), None); + } + + ResponsesStreamEvent::OutputItemDone { item, .. } => { + output_items.push(item); + } + + ResponsesStreamEvent::OutputTextDone { .. } => { + // Text is already complete from deltas, this is just a summary event + } + + ResponsesStreamEvent::ResponseCompleted { response, .. } => { + let model = model_name.as_ref().unwrap_or(&response.model); + let usage = response.usage.as_ref().map_or_else( + Usage::default, + |u| Usage::new( + Some(u.input_tokens), + Some(u.output_tokens), + Some(u.total_tokens), + ), + ); + final_usage = Some(ProviderUsage { + usage, + model: model.clone(), + }); + + // For complete output, use the response output items + if !response.output.is_empty() { + output_items = response.output; + } + + break 'outer; + } + + ResponsesStreamEvent::FunctionCallArgumentsDelta { .. } => { + // Function call arguments are being streamed, but we'll get the complete + // arguments in the OutputItemDone event, so we can ignore deltas for now + } + + ResponsesStreamEvent::FunctionCallArgumentsDone { .. } => { + // Arguments are complete, will be in the OutputItemDone event + } + + ResponsesStreamEvent::ResponseFailed { error, .. } => { + Err(anyhow!("Responses API failed: {:?}", error))?; + } + + ResponsesStreamEvent::Error { error } => { + Err(anyhow!("Responses API error: {:?}", error))?; + } + + _ => { + // Ignore other event types (OutputItemAdded, ContentPartAdded, ContentPartDone) + } + } + } + + // Process final output items and yield usage data + let content = process_streaming_output_items(output_items, is_text_response); + + if !content.is_empty() { + let mut message = Message::new(Role::Assistant, chrono::Utc::now().timestamp(), content); + if let Some(id) = response_id { + message = message.with_id(id); + } + yield (Some(message), final_usage); + } else if let Some(usage) = final_usage { + yield (None, Some(usage)); + } + } +} diff --git a/crates/goose/src/providers/openai.rs b/crates/goose/src/providers/openai.rs index 07b0339bdf1c..9af5579cd1ee 100644 --- a/crates/goose/src/providers/openai.rs +++ b/crates/goose/src/providers/openai.rs @@ -15,7 +15,13 @@ use super::api_client::{ApiClient, AuthMethod}; use super::base::{ConfigKey, ModelInfo, Provider, ProviderMetadata, ProviderUsage, Usage}; use super::embedding::{EmbeddingCapable, EmbeddingRequest, EmbeddingResponse}; use super::errors::ProviderError; -use super::formats::openai::{create_request, get_usage, response_to_message}; +use super::formats::openai::{ + create_request, get_usage, response_to_message, response_to_streaming_message, +}; +use super::formats::openai_responses::{ + create_responses_request, get_responses_usage, responses_api_to_message, + responses_api_to_streaming_message, ResponsesApiResponse, +}; use super::retry::ProviderRetry; use super::utils::{ get_model, handle_response_openai_compat, handle_status_openai_compat, ImageFormat, @@ -25,7 +31,6 @@ use crate::conversation::message::Message; use crate::model::ModelConfig; use crate::providers::base::MessageStream; -use crate::providers::formats::openai::response_to_streaming_message; use crate::providers::utils::RequestLog; use rmcp::model::Tool; @@ -41,6 +46,8 @@ pub const OPEN_AI_KNOWN_MODELS: &[(&str, usize)] = &[ ("gpt-3.5-turbo", 16_385), ("gpt-4-turbo", 128_000), ("o4-mini", 128_000), + ("gpt-5.1-codex", 400_000), + ("gpt-5-codex", 400_000), ]; pub const OPEN_AI_DOC_URL: &str = "https://platform.openai.com/docs/models"; @@ -184,6 +191,10 @@ impl OpenAiProvider { }) } + fn uses_responses_api(model_name: &str) -> bool { + model_name.starts_with("gpt-5-codex") || model_name.starts_with("gpt-5.1-codex") + } + async fn post(&self, payload: &Value) -> Result { let response = self .api_client @@ -191,6 +202,14 @@ impl OpenAiProvider { .await?; handle_response_openai_compat(response).await } + + async fn post_responses(&self, payload: &Value) -> Result { + let response = self + .api_client + .response_post("v1/responses", payload) + .await?; + handle_response_openai_compat(response).await + } } #[async_trait] @@ -238,31 +257,62 @@ impl Provider for OpenAiProvider { messages: &[Message], tools: &[Tool], ) -> Result<(Message, ProviderUsage), ProviderError> { - let payload = create_request(model_config, system, messages, tools, &ImageFormat::OpenAi)?; - - let mut log = RequestLog::start(&self.model, &payload)?; - let json_response = self - .with_retry(|| async { - let payload_clone = payload.clone(); - self.post(&payload_clone).await - }) - .await - .inspect_err(|e| { - let _ = log.error(e); - })?; - - let message = response_to_message(&json_response)?; - let usage = json_response - .get("usage") - .map(get_usage) - .unwrap_or_else(|| { - tracing::debug!("Failed to get usage data"); - Usage::default() - }); - - let model = get_model(&json_response); - log.write(&json_response, Some(&usage))?; - Ok((message, ProviderUsage::new(model, usage))) + if Self::uses_responses_api(&model_config.model_name) { + let payload = create_responses_request(model_config, system, messages, tools)?; + let mut log = RequestLog::start(&self.model, &payload)?; + + let json_response = self + .with_retry(|| async { + let payload_clone = payload.clone(); + self.post_responses(&payload_clone).await + }) + .await + .inspect_err(|e| { + let _ = log.error(e); + })?; + + let responses_api_response: ResponsesApiResponse = + serde_json::from_value(json_response.clone()).map_err(|e| { + ProviderError::ExecutionError(format!( + "Failed to parse responses API response: {}", + e + )) + })?; + + let message = responses_api_to_message(&responses_api_response)?; + let usage = get_responses_usage(&responses_api_response); + let model = responses_api_response.model.clone(); + + log.write(&json_response, Some(&usage))?; + Ok((message, ProviderUsage::new(model, usage))) + } else { + let payload = + create_request(model_config, system, messages, tools, &ImageFormat::OpenAi)?; + + let mut log = RequestLog::start(&self.model, &payload)?; + let json_response = self + .with_retry(|| async { + let payload_clone = payload.clone(); + self.post(&payload_clone).await + }) + .await + .inspect_err(|e| { + let _ = log.error(e); + })?; + + let message = response_to_message(&json_response)?; + let usage = json_response + .get("usage") + .map(get_usage) + .unwrap_or_else(|| { + tracing::debug!("Failed to get usage data"); + Usage::default() + }); + + let model = get_model(&json_response); + log.write(&json_response, Some(&usage))?; + Ok((message, ProviderUsage::new(model, usage))) + } } async fn fetch_supported_models(&self) -> Result>, ProviderError> { @@ -319,40 +369,77 @@ impl Provider for OpenAiProvider { messages: &[Message], tools: &[Tool], ) -> Result { - let mut payload = - create_request(&self.model, system, messages, tools, &ImageFormat::OpenAi)?; - payload["stream"] = serde_json::Value::Bool(true); - payload["stream_options"] = json!({ - "include_usage": true, - }); - let mut log = RequestLog::start(&self.model, &payload)?; - - let response = self - .with_retry(|| async { - let resp = self - .api_client - .response_post(&self.base_path, &payload) - .await?; - handle_status_openai_compat(resp).await - }) - .await - .inspect_err(|e| { - let _ = log.error(e); - })?; - let stream = response.bytes_stream().map_err(io::Error::other); - - Ok(Box::pin(try_stream! { - let stream_reader = StreamReader::new(stream); - let framed = FramedRead::new(stream_reader, LinesCodec::new()).map_err(anyhow::Error::from); - - let message_stream = response_to_streaming_message(framed); - pin!(message_stream); - while let Some(message) = message_stream.next().await { - let (message, usage) = message.map_err(|e| ProviderError::RequestFailed(format!("Stream decode error: {}", e)))?; - log.write(&message, usage.as_ref().map(|f| f.usage).as_ref())?; - yield (message, usage); - } - })) + if Self::uses_responses_api(&self.model.model_name) { + let mut payload = create_responses_request(&self.model, system, messages, tools)?; + payload["stream"] = serde_json::Value::Bool(true); + + let mut log = RequestLog::start(&self.model, &payload)?; + + let response = self + .with_retry(|| async { + let payload_clone = payload.clone(); + let resp = self + .api_client + .response_post("v1/responses", &payload_clone) + .await?; + handle_status_openai_compat(resp).await + }) + .await + .inspect_err(|e| { + let _ = log.error(e); + })?; + + let stream = response.bytes_stream().map_err(io::Error::other); + + Ok(Box::pin(try_stream! { + let stream_reader = StreamReader::new(stream); + let framed = FramedRead::new(stream_reader, LinesCodec::new()).map_err(anyhow::Error::from); + + let message_stream = responses_api_to_streaming_message(framed); + pin!(message_stream); + while let Some(message) = message_stream.next().await { + let (message, usage) = message.map_err(|e| ProviderError::RequestFailed(format!("Stream decode error: {}", e)))?; + log.write(&message, usage.as_ref().map(|f| f.usage).as_ref())?; + yield (message, usage); + } + })) + } else { + let mut payload = + create_request(&self.model, system, messages, tools, &ImageFormat::OpenAi)?; + payload["stream"] = serde_json::Value::Bool(true); + payload["stream_options"] = json!({ + "include_usage": true, + }); + let mut log = RequestLog::start(&self.model, &payload)?; + + let response = self + .with_retry(|| async { + let resp = self + .api_client + .response_post(&self.base_path, &payload) + .await?; + handle_status_openai_compat(resp).await + }) + .await + .inspect_err(|e| { + let _ = log.error(e); + })?; + + let stream = response.bytes_stream().map_err(io::Error::other); + + Ok(Box::pin(try_stream! { + let stream_reader = StreamReader::new(stream); + let framed = FramedRead::new(stream_reader, LinesCodec::new()).map_err(anyhow::Error::from); + + let message_stream = response_to_streaming_message(framed); + pin!(message_stream); + while let Some(message) = message_stream.next().await { + let (message, usage) = message.map_err(|e| ProviderError::RequestFailed(format!("Stream decode error: {}", e)))?; + log.write(&message, usage.as_ref().map(|f| f.usage).as_ref())?; + yield (message, usage); + } + })) + } } }