diff --git a/sgl-router/src/routers/conversations/handlers.rs b/sgl-router/src/routers/conversations/handlers.rs new file mode 100644 index 000000000000..a39de9a3bae3 --- /dev/null +++ b/sgl-router/src/routers/conversations/handlers.rs @@ -0,0 +1,827 @@ +//! Conversation CRUD handlers - shared across routers + +use std::sync::Arc; + +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, + Json, +}; +use chrono::Utc; +use serde_json::{json, Value}; +use tracing::{debug, info, warn}; + +use crate::data_connector::{ + Conversation, ConversationId, ConversationItem, ConversationItemId, ConversationItemStorage, + ConversationStorage, ListParams, NewConversation, NewConversationItem, SortOrder, +}; + +pub const MAX_METADATA_PROPERTIES: usize = 16; + +/// Helper to check conversation exists, returning appropriate error response if not +async fn ensure_conversation_exists( + conversation_storage: &Arc, + conv_id: &ConversationId, +) -> Result { + match conversation_storage.get_conversation(conv_id).await { + Ok(Some(conv)) => Ok(conv), + Ok(None) => Err(( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response()), + Err(e) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to get conversation: {}", e) + })), + ) + .into_response()), + } +} + +const SUPPORTED_ITEM_TYPES: &[&str] = &[ + "message", + "reasoning", + "mcp_list_tools", + "mcp_call", + "item_reference", + "function_call", + "function_call_output", + "file_search_call", + "computer_call", + "computer_call_output", + "web_search_call", + "image_generation_call", + "code_interpreter_call", + "local_shell_call", + "local_shell_call_output", + "mcp_approval_request", + "mcp_approval_response", + "custom_tool_call", + "custom_tool_call_output", +]; + +const IMPLEMENTED_ITEM_TYPES: &[&str] = &[ + "message", + "reasoning", + "mcp_list_tools", + "mcp_call", + "item_reference", +]; + +pub async fn create_conversation( + conversation_storage: &Arc, + body: Value, +) -> Response { + let metadata = match body.get("metadata") { + Some(Value::Object(map)) => { + if map.len() > MAX_METADATA_PROPERTIES { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": + format!( + "metadata cannot have more than {} properties", + MAX_METADATA_PROPERTIES + ) + })), + ) + .into_response(); + } + Some(map.clone()) + } + Some(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "metadata must be an object"})), + ) + .into_response(); + } + None => None, + }; + + let new_conv = NewConversation { id: None, metadata }; + + match conversation_storage.create_conversation(new_conv).await { + Ok(conversation) => { + info!(conversation_id = %conversation.id.0, "Created conversation"); + (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to create conversation: {}", e) + })), + ) + .into_response(), + } +} + +pub async fn get_conversation( + conversation_storage: &Arc, + conv_id: &str, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + match conversation_storage + .get_conversation(&conversation_id) + .await + { + Ok(Some(conversation)) => { + (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to get conversation: {}", e) + })), + ) + .into_response(), + } +} + +pub async fn update_conversation( + conversation_storage: &Arc, + conv_id: &str, + body: Value, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + let current_meta = match conversation_storage + .get_conversation(&conversation_id) + .await + { + Ok(Some(meta)) => meta, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to get conversation: {}", e) + })), + ) + .into_response(); + } + }; + + #[derive(Debug)] + enum Patch { + Set(String, Value), + Delete(String), + } + + let mut patches: Vec = Vec::new(); + + if let Some(metadata_val) = body.get("metadata") { + if let Some(map) = metadata_val.as_object() { + for (k, v) in map { + if v.is_null() { + patches.push(Patch::Delete(k.clone())); + } else { + patches.push(Patch::Set(k.clone(), v.clone())); + } + } + } else { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "metadata must be an object"})), + ) + .into_response(); + } + } + + let mut new_metadata = current_meta.metadata.clone().unwrap_or_default(); + for patch in patches { + match patch { + Patch::Set(k, v) => { + new_metadata.insert(k, v); + } + Patch::Delete(k) => { + new_metadata.remove(&k); + } + } + } + + if new_metadata.len() > MAX_METADATA_PROPERTIES { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": + format!( + "metadata cannot have more than {} properties", + MAX_METADATA_PROPERTIES + ) + })), + ) + .into_response(); + } + + let final_metadata = if new_metadata.is_empty() { + None + } else { + Some(new_metadata) + }; + + match conversation_storage + .update_conversation(&conversation_id, final_metadata) + .await + { + Ok(Some(conversation)) => { + info!(conversation_id = %conversation_id.0, "Updated conversation"); + (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Conversation not found"})), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to update conversation: {}", e) + })), + ) + .into_response(), + } +} + +pub async fn delete_conversation( + conversation_storage: &Arc, + conv_id: &str, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + if let Err(response) = ensure_conversation_exists(conversation_storage, &conversation_id).await + { + return response; + } + + match conversation_storage + .delete_conversation(&conversation_id) + .await + { + Ok(_) => { + info!(conversation_id = %conversation_id.0, "Deleted conversation"); + ( + StatusCode::OK, + Json(json!({ + "id": conversation_id.0, + "object": "conversation.deleted", + "deleted": true + })), + ) + .into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to delete conversation: {}", e) + })), + ) + .into_response(), + } +} + +pub async fn list_conversation_items( + conversation_storage: &Arc, + item_storage: &Arc, + conv_id: &str, + limit: Option, + order: Option<&str>, + after: Option<&str>, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + if let Err(response) = ensure_conversation_exists(conversation_storage, &conversation_id).await + { + return response; + } + + let limit = limit.unwrap_or(100); + let order = match order { + Some("asc") => SortOrder::Asc, + _ => SortOrder::Desc, + }; + + let params = ListParams { + limit, + order, + after: after.map(String::from), + }; + + match item_storage.list_items(&conversation_id, params).await { + Ok(items) => { + let item_values: Vec = items + .iter() + .map(|item| { + let mut item_json = item_to_json(item); + if let Some(obj) = item_json.as_object_mut() { + obj.insert("created_at".to_string(), json!(item.created_at)); + } + item_json + }) + .collect(); + + let has_more = items.len() == limit; + let last_id = items.last().map(|item| item.id.0.clone()); + + ( + StatusCode::OK, + Json(json!({ + "object": "list", + "data": item_values, + "has_more": has_more, + "first_id": items.first().map(|item| &item.id.0), + "last_id": last_id, + })), + ) + .into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to list items: {}", e) })), + ) + .into_response(), + } +} + +pub async fn create_conversation_items( + conversation_storage: &Arc, + item_storage: &Arc, + conv_id: &str, + body: Value, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + + if let Err(response) = ensure_conversation_exists(conversation_storage, &conversation_id).await + { + return response; + } + + let items_array = match body.get("items").and_then(|v| v.as_array()) { + Some(arr) => arr, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "Missing or invalid 'items' field"})), + ) + .into_response(); + } + }; + + if items_array.len() > 20 { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "Cannot add more than 20 items at a time"})), + ) + .into_response(); + } + + let mut created_items = Vec::new(); + let mut warnings = Vec::new(); + let added_at = Utc::now(); + + for item_val in items_array { + let item_type = item_val + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("message"); + + if item_type == "item_reference" { + let ref_id = match item_val.get("id").and_then(|v| v.as_str()) { + Some(id) => id, + None => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "item_reference requires 'id' field"})), + ) + .into_response(); + } + }; + + let existing_item_id = ConversationItemId::from(ref_id); + + let existing_item = match item_storage.get_item(&existing_item_id).await { + Ok(Some(item)) => item, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({ + "error": format!("Referenced item '{}' not found", ref_id) + })), + ) + .into_response(); + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to get referenced item: {}", e) + })), + ) + .into_response(); + } + }; + + if let Err(e) = item_storage + .link_item(&conversation_id, &existing_item.id, added_at) + .await + { + warn!("Failed to link item {}: {}", existing_item.id.0, e); + } + + created_items.push(item_to_json(&existing_item)); + continue; + } + + let user_provided_id = item_val.get("id").and_then(|v| v.as_str()); + + let item = if let Some(id_str) = user_provided_id { + let item_id = ConversationItemId::from(id_str); + + let is_already_linked = match item_storage + .is_item_linked(&conversation_id, &item_id) + .await + { + Ok(linked) => linked, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to check item link: {}", e) + })), + ) + .into_response(); + } + }; + + if is_already_linked { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": { + "message": "Item already in conversation", + "type": "invalid_request_error", + "param": "items", + "code": "item_already_in_conversation" + } + })), + ) + .into_response(); + } + + let existing_item = match item_storage.get_item(&item_id).await { + Ok(Some(item)) => item, + Ok(None) => { + let (new_item, warning) = match parse_item_from_value(item_val) { + Ok((mut item, warn)) => { + item.id = Some(item_id.clone()); + (item, warn) + } + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ "error": format!("Invalid item: {}", e) })), + ) + .into_response(); + } + }; + + if let Some(w) = warning { + warnings.push(w); + } + + match item_storage.create_item(new_item).await { + Ok(item) => item, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to create item: {}", e) })), + ) + .into_response(); + } + } + } + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to check item existence: {}", e) + })), + ) + .into_response(); + } + }; + + existing_item + } else { + let (new_item, warning) = match parse_item_from_value(item_val) { + Ok((item, warn)) => (item, warn), + Err(e) => { + return ( + StatusCode::BAD_REQUEST, + Json(json!({ "error": format!("Invalid item: {}", e) })), + ) + .into_response(); + } + }; + + if let Some(w) = warning { + warnings.push(w); + } + + match item_storage.create_item(new_item).await { + Ok(item) => item, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to create item: {}", e) })), + ) + .into_response(); + } + } + }; + + if let Err(e) = item_storage + .link_item(&conversation_id, &item.id, added_at) + .await + { + warn!("Failed to link item {}: {}", item.id.0, e); + } + + created_items.push(item_to_json(&item)); + } + + let first_id = created_items.first().and_then(|v| v.get("id")); + let last_id = created_items.last().and_then(|v| v.get("id")); + + let mut response = json!({ + "object": "list", + "data": created_items, + "first_id": first_id, + "last_id": last_id, + "has_more": false + }); + + if !warnings.is_empty() { + if let Some(obj) = response.as_object_mut() { + obj.insert("warnings".to_string(), json!(warnings)); + } + } + + (StatusCode::OK, Json(response)).into_response() +} + +pub async fn get_conversation_item( + conversation_storage: &Arc, + item_storage: &Arc, + conv_id: &str, + item_id: &str, + _include: Option>, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + let item_id = ConversationItemId::from(item_id); + + if let Err(response) = ensure_conversation_exists(conversation_storage, &conversation_id).await + { + return response; + } + + let is_linked = match item_storage + .is_item_linked(&conversation_id, &item_id) + .await + { + Ok(linked) => linked, + Err(e) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ + "error": format!("Failed to check item link: {}", e) + })), + ) + .into_response(); + } + }; + + if !is_linked { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Item not found in this conversation"})), + ) + .into_response(); + } + + match item_storage.get_item(&item_id).await { + Ok(Some(item)) => (StatusCode::OK, Json(item_to_json(&item))).into_response(), + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "Item not found"})), + ) + .into_response(), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to get item: {}", e) })), + ) + .into_response(), + } +} + +pub async fn delete_conversation_item( + conversation_storage: &Arc, + item_storage: &Arc, + conv_id: &str, + item_id: &str, +) -> Response { + let conversation_id = ConversationId::from(conv_id); + let item_id = ConversationItemId::from(item_id); + + let conversation = + match ensure_conversation_exists(conversation_storage, &conversation_id).await { + Ok(conv) => conv, + Err(response) => return response, + }; + + match item_storage.delete_item(&conversation_id, &item_id).await { + Ok(_) => { + info!( + conversation_id = %conversation_id.0, + item_id = %item_id.0, + "Deleted conversation item" + ); + (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() + } + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({ "error": format!("Failed to delete item: {}", e) })), + ) + .into_response(), + } +} + +pub async fn create_and_link_item( + item_storage: &Arc, + conv_id_opt: Option<&ConversationId>, + mut new_item: NewConversationItem, +) -> Result<(), String> { + if new_item.status.is_none() { + new_item.status = Some("completed".to_string()); + } + + let created = item_storage + .create_item(new_item) + .await + .map_err(|e| format!("Failed to create item: {}", e))?; + + if let Some(conv_id) = conv_id_opt { + item_storage + .link_item(conv_id, &created.id, Utc::now()) + .await + .map_err(|e| format!("Failed to link item: {}", e))?; + + debug!( + conversation_id = %conv_id.0, + item_id = %created.id.0, + item_type = %created.item_type, + "Persisted conversation item and link" + ); + } else { + debug!( + item_id = %created.id.0, + item_type = %created.item_type, + "Persisted conversation item (no conversation link)" + ); + } + + Ok(()) +} + +fn parse_item_from_value( + item_val: &Value, +) -> Result<(NewConversationItem, Option), String> { + let item_type = item_val + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("message"); + + if !SUPPORTED_ITEM_TYPES.contains(&item_type) { + return Err(format!( + "Unsupported item type '{}'. Supported types: {}", + item_type, + SUPPORTED_ITEM_TYPES.join(", ") + )); + } + + let warning = if !IMPLEMENTED_ITEM_TYPES.contains(&item_type) { + Some(format!( + "Item type '{}' is accepted but not yet implemented. \ + The item will be stored but may not function as expected.", + item_type + )) + } else { + None + }; + + let role = item_val + .get("role") + .and_then(|v| v.as_str()) + .map(String::from); + let status = item_val + .get("status") + .and_then(|v| v.as_str()) + .map(String::from) + .or_else(|| Some("completed".to_string())); + + if item_type == "message" && role.is_none() { + return Err("Message items require 'role' field".to_string()); + } + + let content = if item_type == "message" || item_type == "reasoning" { + item_val.get("content").cloned().unwrap_or(json!([])) + } else { + item_val.clone() + }; + + Ok(( + NewConversationItem { + id: None, + response_id: None, + item_type: item_type.to_string(), + role, + content, + status, + }, + warning, + )) +} + +pub fn item_to_json(item: &ConversationItem) -> Value { + let mut obj = serde_json::Map::new(); + obj.insert("id".to_string(), json!(item.id.0)); + obj.insert("type".to_string(), json!(item.item_type)); + + if let Some(role) = &item.role { + obj.insert("role".to_string(), json!(role)); + } + + // Map item types to their expected fields + let fields: Option<&[&str]> = match item.item_type.as_str() { + "mcp_call" => Some(&[ + "name", + "arguments", + "output", + "server_label", + "approval_request_id", + "error", + ]), + "mcp_list_tools" => Some(&["tools", "server_label"]), + "function_call" => Some(&["call_id", "name", "arguments", "output"]), + "function_call_output" => Some(&["call_id", "output"]), + _ => None, + }; + + if let Some(fields) = fields { + if let Some(content_obj) = item.content.as_object() { + for field in fields { + if let Some(value) = content_obj.get(*field) { + obj.insert((*field).to_string(), value.clone()); + } + } + } + } else { + obj.insert("content".to_string(), item.content.clone()); + } + + if let Some(status) = &item.status { + obj.insert("status".to_string(), json!(status)); + } + + Value::Object(obj) +} + +pub fn conversation_to_json(conversation: &Conversation) -> Value { + let mut obj = json!({ + "id": conversation.id.0, + "object": "conversation", + "created_at": conversation.created_at.timestamp() + }); + + if let Some(metadata) = &conversation.metadata { + if !metadata.is_empty() { + obj["metadata"] = Value::Object(metadata.clone()); + } + } + + obj +} diff --git a/sgl-router/src/routers/conversations/mod.rs b/sgl-router/src/routers/conversations/mod.rs new file mode 100644 index 000000000000..219547dfd415 --- /dev/null +++ b/sgl-router/src/routers/conversations/mod.rs @@ -0,0 +1,8 @@ +//! Shared conversation management module. +//! +//! This module provides conversation CRUD operations that can be shared +//! across different router implementations. + +mod handlers; + +pub use handlers::*; diff --git a/sgl-router/src/routers/mod.rs b/sgl-router/src/routers/mod.rs index aaa963468fc6..9ee7aaf0db61 100644 --- a/sgl-router/src/routers/mod.rs +++ b/sgl-router/src/routers/mod.rs @@ -21,11 +21,12 @@ use crate::protocols::{ responses::{ResponsesGetParams, ResponsesRequest}, }; +pub mod conversations; pub mod factory; pub mod grpc; pub mod header_utils; pub mod http; -pub mod openai; // New refactored OpenAI router module +pub mod openai; pub mod router_manager; pub use factory::RouterFactory; @@ -141,15 +142,24 @@ pub trait RouterTrait: Send + Sync + Debug { model_id: Option<&str>, ) -> Response; - // Conversations API + /// Get router type name + fn router_type(&self) -> &'static str; + + /// Check if this is a PD router + fn is_pd_mode(&self) -> bool { + self.router_type() == "pd" + } + + /// Create a new conversation async fn create_conversation(&self, _headers: Option<&HeaderMap>, _body: &Value) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversations create endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } + /// Get a conversation by ID async fn get_conversation( &self, _headers: Option<&HeaderMap>, @@ -157,11 +167,12 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversations get endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } + /// Update a conversation async fn update_conversation( &self, _headers: Option<&HeaderMap>, @@ -170,11 +181,12 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversations update endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } + /// Delete a conversation async fn delete_conversation( &self, _headers: Option<&HeaderMap>, @@ -182,23 +194,23 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversations delete endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } - /// List items for a conversation + /// List items in a conversation async fn list_conversation_items( &self, _headers: Option<&HeaderMap>, _conversation_id: &str, _limit: Option, - _order: Option, - _after: Option, + _order: Option<&str>, + _after: Option<&str>, ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversation items list endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } @@ -212,13 +224,12 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversation items create endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } - /// Get a single conversation item - /// The `include` parameter is accepted but not yet implemented + /// Get a specific item from a conversation async fn get_conversation_item( &self, _headers: Option<&HeaderMap>, @@ -228,12 +239,12 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversation item get endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } - /// Delete a conversation item + /// Delete an item from a conversation async fn delete_conversation_item( &self, _headers: Option<&HeaderMap>, @@ -242,16 +253,8 @@ pub trait RouterTrait: Send + Sync + Debug { ) -> Response { ( StatusCode::NOT_IMPLEMENTED, - "Conversation item delete endpoint not implemented", + "Conversations not supported by this router", ) .into_response() } - - /// Get router type name - fn router_type(&self) -> &'static str; - - /// Check if this is a PD router - fn is_pd_mode(&self) -> bool { - self.router_type() == "pd" - } } diff --git a/sgl-router/src/routers/openai/conversations.rs b/sgl-router/src/routers/openai/conversations.rs index 44332f228ef0..838fa7ce72a7 100644 --- a/sgl-router/src/routers/openai/conversations.rs +++ b/sgl-router/src/routers/openai/conversations.rs @@ -1,1098 +1,89 @@ -//! Conversation CRUD operations and persistence +//! Conversation operations for OpenAI router +//! +//! Re-exports shared CRUD handlers and provides OpenAI-specific persistence logic. -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; -use axum::{ - http::StatusCode, - response::{IntoResponse, Response}, - Json, -}; -use chrono::Utc; use serde_json::{json, Value}; -use tracing::{debug, info, warn}; +use tracing::{info, warn}; use super::responses::build_stored_response; +// Re-export shared conversation handlers for backward compatibility +pub use crate::routers::conversations::{ + conversation_to_json, create_and_link_item, create_conversation, create_conversation_items, + delete_conversation, delete_conversation_item, get_conversation, get_conversation_item, + item_to_json, list_conversation_items, update_conversation, MAX_METADATA_PROPERTIES, +}; use crate::{ data_connector::{ - Conversation, ConversationId, ConversationItemId, ConversationItemStorage, - ConversationStorage, ListParams, NewConversation, NewConversationItem, ResponseId, - ResponseStorage, SortOrder, + ConversationId, ConversationItemId, ConversationItemStorage, ConversationStorage, + NewConversationItem, ResponseId, ResponseStorage, }, protocols::responses::{generate_id, ResponseInput, ResponsesRequest}, }; -/// Maximum number of properties allowed in conversation metadata -pub(crate) const MAX_METADATA_PROPERTIES: usize = 16; - -// ============================================================================ -// Conversation CRUD Operations -// ============================================================================ - -/// Create a new conversation -pub(super) async fn create_conversation( - conversation_storage: &Arc, - body: Value, -) -> Response { - // TODO: The validation should be done in the right place - let metadata = match body.get("metadata") { - Some(Value::Object(map)) => { - if map.len() > MAX_METADATA_PROPERTIES { - return ( - StatusCode::BAD_REQUEST, - Json(json!({ - "error": - format!( - "metadata cannot have more than {} properties", - MAX_METADATA_PROPERTIES - ) - })), - ) - .into_response(); - } - Some(map.clone()) - } - Some(_) => { - return ( - StatusCode::BAD_REQUEST, - Json(json!({"error": "metadata must be an object"})), - ) - .into_response(); - } - None => None, - }; - - let new_conv = NewConversation { - id: None, // Generate random ID (OpenAI behavior for POST /v1/conversations) - metadata, - }; - - match conversation_storage.create_conversation(new_conv).await { - Ok(conversation) => { - info!(conversation_id = %conversation.id.0, "Created conversation"); - (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to create conversation: {}", e) - })), - ) - .into_response(), - } -} - -/// Get a conversation by ID -pub(super) async fn get_conversation( - conversation_storage: &Arc, - conv_id: &str, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - - match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(conversation)) => { - (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() - } - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(), - } -} - -/// Update a conversation's metadata -pub(super) async fn update_conversation( - conversation_storage: &Arc, - conv_id: &str, - body: Value, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - - let current_meta = match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(meta)) => meta, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - }; - - #[derive(Debug)] - enum Patch { - Set(String, Value), - Delete(String), - } - - let mut patches: Vec = Vec::new(); - - if let Some(metadata_val) = body.get("metadata") { - if let Some(map) = metadata_val.as_object() { - for (k, v) in map { - if v.is_null() { - patches.push(Patch::Delete(k.clone())); - } else { - patches.push(Patch::Set(k.clone(), v.clone())); - } - } - } else { - return ( - StatusCode::BAD_REQUEST, - Json(json!({"error": "metadata must be an object"})), - ) - .into_response(); - } - } - - let mut new_metadata = current_meta.metadata.clone().unwrap_or_default(); - for patch in patches { - match patch { - Patch::Set(k, v) => { - new_metadata.insert(k, v); - } - Patch::Delete(k) => { - new_metadata.remove(&k); - } - } - } - - if new_metadata.len() > MAX_METADATA_PROPERTIES { - return ( - StatusCode::BAD_REQUEST, - Json(json!({ - "error": - format!( - "metadata cannot have more than {} properties", - MAX_METADATA_PROPERTIES - ) - })), - ) - .into_response(); - } - - let final_metadata = if new_metadata.is_empty() { - None - } else { - Some(new_metadata) - }; - - match conversation_storage - .update_conversation(&conversation_id, final_metadata) - .await - { - Ok(Some(conversation)) => { - info!(conversation_id = %conversation_id.0, "Updated conversation"); - (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() - } - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to update conversation: {}", e) - })), - ) - .into_response(), - } -} - -/// Delete a conversation -pub(super) async fn delete_conversation( - conversation_storage: &Arc, - conv_id: &str, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - - match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(_)) => {} - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - } - - match conversation_storage - .delete_conversation(&conversation_id) - .await - { - Ok(_) => { - info!(conversation_id = %conversation_id.0, "Deleted conversation"); - ( - StatusCode::OK, - Json(json!({ - "id": conversation_id.0, - "object": "conversation.deleted", - "deleted": true - })), - ) - .into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to delete conversation: {}", e) - })), - ) - .into_response(), - } -} - -/// List items in a conversation with pagination -pub(super) async fn list_conversation_items( - conversation_storage: &Arc, - item_storage: &Arc, - conv_id: &str, - query_params: HashMap, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - - match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(_)) => {} - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - } - - let limit: usize = query_params - .get("limit") - .and_then(|s| s.parse().ok()) - .unwrap_or(100); - - let after = query_params.get("after").map(|s| s.to_string()); - - // Default to descending order (most recent first) - let order = query_params - .get("order") - .and_then(|s| match s.as_str() { - "asc" => Some(SortOrder::Asc), - "desc" => Some(SortOrder::Desc), - _ => None, - }) - .unwrap_or(SortOrder::Desc); - - let params = ListParams { - limit, - order, - after, - }; - - match item_storage.list_items(&conversation_id, params).await { - Ok(items) => { - let item_values: Vec = items - .iter() - .map(|item| { - let mut item_json = item_to_json(item); - // Add created_at field for list view - if let Some(obj) = item_json.as_object_mut() { - obj.insert("created_at".to_string(), json!(item.created_at)); - } - item_json - }) - .collect(); - - let has_more = items.len() == limit; - let last_id = items.last().map(|item| item.id.0.clone()); - - ( - StatusCode::OK, - Json(json!({ - "object": "list", - "data": item_values, - "has_more": has_more, - "first_id": items.first().map(|item| &item.id.0), - "last_id": last_id, - })), - ) - .into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to list items: {}", e) })), - ) - .into_response(), - } -} - // ============================================================================ -// Conversation Item Operations +// Persistence Operations (OpenAI-specific) // ============================================================================ -/// Supported item types for creation -/// Types marked as "implemented" are fully supported -/// Types marked as "accepted" are stored but return not-implemented warnings -const SUPPORTED_ITEM_TYPES: &[&str] = &[ - // Fully implemented types - "message", - "reasoning", - "mcp_list_tools", - "mcp_call", - "item_reference", - "function_call", - "function_call_output", - // Accepted but not yet implemented (stored, warning returned) - "file_search_call", - "computer_call", - "computer_call_output", - "web_search_call", - "image_generation_call", - "code_interpreter_call", - "local_shell_call", - "local_shell_call_output", - "mcp_approval_request", - "mcp_approval_response", - "custom_tool_call", - "custom_tool_call_output", -]; - -/// Item types that are fully implemented with business logic -const IMPLEMENTED_ITEM_TYPES: &[&str] = &[ - "message", - "reasoning", - "mcp_list_tools", - "mcp_call", - "item_reference", -]; - -/// Create items in a conversation (bulk operation) -pub(super) async fn create_conversation_items( - conversation_storage: &Arc, - item_storage: &Arc, - conv_id: &str, - body: Value, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - - // Verify conversation exists - match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(_)) => {} - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - } - - // Parse items array from request - let items_array = match body.get("items").and_then(|v| v.as_array()) { - Some(arr) => arr, - None => { - return ( - StatusCode::BAD_REQUEST, - Json(json!({"error": "Missing or invalid 'items' field"})), - ) - .into_response(); - } - }; - - // Validate limit (max 20 items per OpenAI spec) - if items_array.len() > 20 { - return ( - StatusCode::BAD_REQUEST, - Json(json!({"error": "Cannot add more than 20 items at a time"})), - ) - .into_response(); - } - - // Convert and create items - let mut created_items = Vec::new(); - let mut warnings = Vec::new(); - let added_at = Utc::now(); - - for item_val in items_array { - let item_type = item_val - .get("type") - .and_then(|v| v.as_str()) - .unwrap_or("message"); - - // Handle item_reference specially - link existing item instead of creating new - if item_type == "item_reference" { - let ref_id = match item_val.get("id").and_then(|v| v.as_str()) { - Some(id) => id, - None => { - return ( - StatusCode::BAD_REQUEST, - Json(json!({"error": "item_reference requires 'id' field"})), - ) - .into_response(); - } - }; - - let existing_item_id = ConversationItemId::from(ref_id); - - // Retrieve the existing item - let existing_item = match item_storage.get_item(&existing_item_id).await { - Ok(Some(item)) => item, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({ - "error": format!("Referenced item '{}' not found", ref_id) - })), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get referenced item: {}", e) - })), - ) - .into_response(); - } - }; - - // Link existing item to this conversation - if let Err(e) = item_storage - .link_item(&conversation_id, &existing_item.id, added_at) - .await - { - warn!("Failed to link item {}: {}", existing_item.id.0, e); - } - - created_items.push(item_to_json(&existing_item)); - continue; - } - - // Check if user provided an ID - let user_provided_id = item_val.get("id").and_then(|v| v.as_str()); - - let item = if let Some(id_str) = user_provided_id { - // User provided an ID - check if it already exists in DB - let item_id = ConversationItemId::from(id_str); - - // First check if this item is already linked to this conversation - let is_already_linked = match item_storage - .is_item_linked(&conversation_id, &item_id) - .await - { - Ok(linked) => linked, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to check item link: {}", e) - })), - ) - .into_response(); - } - }; - - if is_already_linked { - // Item already linked to this conversation - return error - return ( - StatusCode::BAD_REQUEST, - Json(json!({ - "error": { - "message": "Item already in conversation", - "type": "invalid_request_error", - "param": "items", - "code": "item_already_in_conversation" - } - })), - ) - .into_response(); - } - - // Check if item exists in DB - let existing_item = match item_storage.get_item(&item_id).await { - Ok(Some(item)) => item, - Ok(None) => { - // Item doesn't exist in DB, create new one with user-provided content - let (new_item, warning) = match parse_item_from_value(item_val) { - Ok((mut item, warn)) => { - // Use the user-provided ID - item.id = Some(item_id.clone()); - (item, warn) - } - Err(e) => { - return ( - StatusCode::BAD_REQUEST, - Json(json!({ "error": format!("Invalid item: {}", e) })), - ) - .into_response(); - } - }; - - // Collect warnings for not-implemented types - if let Some(w) = warning { - warnings.push(w); - } - - // Create item with provided ID - match item_storage.create_item(new_item).await { - Ok(item) => item, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to create item: {}", e) })), - ) - .into_response(); - } - } - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to check item existence: {}", e) - })), - ) - .into_response(); - } - }; - - existing_item - } else { - // No ID provided - parse and create new item normally - let (new_item, warning) = match parse_item_from_value(item_val) { - Ok((item, warn)) => (item, warn), - Err(e) => { - return ( - StatusCode::BAD_REQUEST, - Json(json!({ "error": format!("Invalid item: {}", e) })), - ) - .into_response(); - } - }; - - // Collect warnings for not-implemented types - if let Some(w) = warning { - warnings.push(w); - } - - // Create item - match item_storage.create_item(new_item).await { - Ok(item) => item, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to create item: {}", e) })), - ) - .into_response(); - } - } - }; - - // Link to conversation - if let Err(e) = item_storage - .link_item(&conversation_id, &item.id, added_at) - .await - { - warn!("Failed to link item {}: {}", item.id.0, e); - } - - created_items.push(item_to_json(&item)); - } - - // Build response matching OpenAI format - let first_id = created_items.first().and_then(|v| v.get("id")); - let last_id = created_items.last().and_then(|v| v.get("id")); - - let mut response = json!({ - "object": "list", - "data": created_items, - "first_id": first_id, - "last_id": last_id, - "has_more": false - }); - - // Add warnings if any not-implemented types were used - if !warnings.is_empty() { - if let Some(obj) = response.as_object_mut() { - obj.insert("warnings".to_string(), json!(warnings)); - } - } - - (StatusCode::OK, Json(response)).into_response() -} - -/// Get a single conversation item -/// Note: `include` query parameter is accepted but not yet implemented -pub(super) async fn get_conversation_item( - conversation_storage: &Arc, - item_storage: &Arc, - conv_id: &str, - item_id: &str, - _include: Option>, // Reserved for future use -) -> Response { - let conversation_id = ConversationId::from(conv_id); - let item_id = ConversationItemId::from(item_id); - - // Verify conversation exists - match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(_)) => {} - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - } - - // First check if the item is linked to this conversation - let is_linked = match item_storage - .is_item_linked(&conversation_id, &item_id) - .await - { - Ok(linked) => linked, - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to check item link: {}", e) - })), - ) - .into_response(); - } - }; - - if !is_linked { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Item not found in this conversation"})), - ) - .into_response(); - } - - // Get the item - match item_storage.get_item(&item_id).await { - Ok(Some(item)) => { - // TODO: Process `include` parameter when implemented - // Example: include=["metadata", "timestamps"] - (StatusCode::OK, Json(item_to_json(&item))).into_response() - } - Ok(None) => ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Item not found"})), - ) - .into_response(), - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to get item: {}", e) })), - ) - .into_response(), - } -} - -/// Delete a conversation item -pub(super) async fn delete_conversation_item( - conversation_storage: &Arc, - item_storage: &Arc, - conv_id: &str, - item_id: &str, -) -> Response { - let conversation_id = ConversationId::from(conv_id); - let item_id = ConversationItemId::from(item_id); - - // Verify conversation exists and get it for response - let conversation = match conversation_storage - .get_conversation(&conversation_id) - .await - { - Ok(Some(conv)) => conv, - Ok(None) => { - return ( - StatusCode::NOT_FOUND, - Json(json!({"error": "Conversation not found"})), - ) - .into_response(); - } - Err(e) => { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ - "error": format!("Failed to get conversation: {}", e) - })), - ) - .into_response(); - } - }; - - // Delete the item - match item_storage.delete_item(&conversation_id, &item_id).await { - Ok(_) => { - info!( - conversation_id = %conversation_id.0, - item_id = %item_id.0, - "Deleted conversation item" - ); - - // Return updated conversation object (per OpenAI spec) - (StatusCode::OK, Json(conversation_to_json(&conversation))).into_response() - } - Err(e) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({ "error": format!("Failed to delete item: {}", e) })), - ) - .into_response(), - } -} - -/// Parse NewConversationItem from Value -/// Returns (NewConversationItem, Option) -/// Supports three top-level structures: -/// 1. Input message: {"type": "message", "role": "...", "content": [...]} -/// 2. Item: {"type": "message|function_tool_call|...", ...} -/// 3. Item reference: {"type": "item_reference", "id": "..."} -fn parse_item_from_value( - item_val: &Value, -) -> Result<(NewConversationItem, Option), String> { - // Detect structure type - let item_type = item_val - .get("type") - .and_then(|v| v.as_str()) - .unwrap_or("message"); - - // Validate item type is supported - if !SUPPORTED_ITEM_TYPES.contains(&item_type) { - return Err(format!( - "Unsupported item type '{}'. Supported types: {}", - item_type, - SUPPORTED_ITEM_TYPES.join(", ") - )); - } - - // Check if type is implemented or just accepted - let warning = if !IMPLEMENTED_ITEM_TYPES.contains(&item_type) { - Some(format!( - "Item type '{}' is accepted but not yet implemented. \ - The item will be stored but may not function as expected.", - item_type - )) - } else { - None - }; - - // Parse common fields - let role = item_val - .get("role") - .and_then(|v| v.as_str()) - .map(String::from); - let status = item_val - .get("status") - .and_then(|v| v.as_str()) - .map(String::from) - .or_else(|| Some("completed".to_string())); // Default status - - // Validate message types have role - if item_type == "message" && role.is_none() { - return Err("Message items require 'role' field".to_string()); - } - - // For special types (mcp_call, function_tool_call, etc.), store the entire item_val as content - // For message types, use the content field directly - let content = if item_type == "message" || item_type == "reasoning" { - item_val.get("content").cloned().unwrap_or(json!([])) - } else { - // Store entire item for extraction later - item_val.clone() - }; - - Ok(( - NewConversationItem { - id: None, - response_id: None, - item_type: item_type.to_string(), - role, - content, - status, - }, - warning, - )) -} - -/// Convert ConversationItem to JSON response format -/// Extracts fields from content for special types (mcp_call, mcp_list_tools, etc.) -fn item_to_json(item: &crate::data_connector::ConversationItem) -> Value { - let mut obj = serde_json::Map::new(); - obj.insert("id".to_string(), json!(item.id.0)); - obj.insert("type".to_string(), json!(item.item_type)); - - if let Some(role) = &item.role { - obj.insert("role".to_string(), json!(role)); - } - - // Handle special item types that need field extraction from content - match item.item_type.as_str() { - "mcp_call" => { - // Extract mcp_call fields: name, arguments, output, server_label, approval_request_id, error - if let Some(content_obj) = item.content.as_object() { - if let Some(name) = content_obj.get("name") { - obj.insert("name".to_string(), name.clone()); - } - if let Some(arguments) = content_obj.get("arguments") { - obj.insert("arguments".to_string(), arguments.clone()); - } - if let Some(output) = content_obj.get("output") { - obj.insert("output".to_string(), output.clone()); - } - if let Some(server_label) = content_obj.get("server_label") { - obj.insert("server_label".to_string(), server_label.clone()); - } - if let Some(approval_request_id) = content_obj.get("approval_request_id") { - obj.insert( - "approval_request_id".to_string(), - approval_request_id.clone(), - ); - } - if let Some(error) = content_obj.get("error") { - obj.insert("error".to_string(), error.clone()); - } - } - } - "mcp_list_tools" => { - // Extract mcp_list_tools fields: tools, server_label - if let Some(content_obj) = item.content.as_object() { - if let Some(tools) = content_obj.get("tools") { - obj.insert("tools".to_string(), tools.clone()); - } - if let Some(server_label) = content_obj.get("server_label") { - obj.insert("server_label".to_string(), server_label.clone()); - } - } - } - "function_call" => { - // Extract function_call fields: call_id, name, arguments, output - if let Some(content_obj) = item.content.as_object() { - for field in ["call_id", "name", "arguments", "output"] { - if let Some(value) = content_obj.get(field) { - obj.insert(field.to_string(), value.clone()); - } - } - } - } - "function_call_output" => { - // Extract function_call_output fields: call_id, output - if let Some(content_obj) = item.content.as_object() { - for field in ["call_id", "output"] { - if let Some(value) = content_obj.get(field) { - obj.insert(field.to_string(), value.clone()); - } - } - } - } - _ => { - // For all other types (message, reasoning, etc.), keep content as-is - obj.insert("content".to_string(), item.content.clone()); - } - } - - if let Some(status) = &item.status { - obj.insert("status".to_string(), json!(status)); - } - - Value::Object(obj) -} - -// ============================================================================ -// Persistence Operations -// ============================================================================ - -/// Persist conversation items (delegates to persist_items_with_storages) -pub async fn persist_conversation_items( - conversation_storage: Arc, - item_storage: Arc, - response_storage: Arc, - response_json: &Value, - original_body: &ResponsesRequest, -) -> Result<(), String> { - persist_items_with_storages( - conversation_storage, - item_storage, - response_storage, - response_json, - original_body, - ) - .await -} - -/// Helper function to create and optionally link a conversation item -/// If conv_id is None, only creates the item without linking -async fn create_and_link_item( - item_storage: &Arc, - conv_id_opt: Option<&ConversationId>, - mut new_item: NewConversationItem, -) -> Result<(), String> { - // Set default status if not provided - if new_item.status.is_none() { - new_item.status = Some("completed".to_string()); - } - - // Step 1: Create the item - let created = item_storage - .create_item(new_item) - .await - .map_err(|e| format!("Failed to create item: {}", e))?; - - // Step 2: Link it to the conversation (if provided) - if let Some(conv_id) = conv_id_opt { - item_storage - .link_item(conv_id, &created.id, Utc::now()) - .await - .map_err(|e| format!("Failed to link item: {}", e))?; - - debug!( - conversation_id = %conv_id.0, - item_id = %created.id.0, - item_type = %created.item_type, - "Persisted conversation item and link" - ); - } else { - debug!( - item_id = %created.id.0, - item_type = %created.item_type, - "Persisted conversation item (no conversation link)" - ); - } - - Ok(()) -} - -/// Persist conversation items with all storages +/// Persist conversation items to storage /// /// This function: /// 1. Extracts and normalizes input items from the request /// 2. Extracts output items from the response /// 3. Stores ALL items in response storage (always) /// 4. If conversation provided, also links items to conversation -async fn persist_items_with_storages( +pub async fn persist_conversation_items( conversation_storage: Arc, item_storage: Arc, response_storage: Arc, response_json: &Value, original_body: &ResponsesRequest, ) -> Result<(), String> { - // Step 1: Extract response ID + // Extract response ID let response_id_str = response_json .get("id") .and_then(|v| v.as_str()) .ok_or_else(|| "Response missing id field".to_string())?; let response_id = ResponseId::from(response_id_str); - // Step 2: Parse and normalize input items from request + // Parse and normalize input items from request let input_items = extract_input_items(&original_body.input)?; - // Step 3: Parse output items from response - let output_items = extract_output_items(response_json)?; + // Parse output items from response + let output_items = response_json + .get("output") + .and_then(|v| v.as_array()) + .cloned() + .ok_or_else(|| "No output array in response".to_string())?; - // Step 4: Build StoredResponse with input and output as JSON arrays + // Build and store response let mut stored_response = build_stored_response(response_json, original_body); stored_response.id = response_id.clone(); stored_response.input = Value::Array(input_items.clone()); stored_response.output = Value::Array(output_items.clone()); - // Step 5: Store response (ALWAYS, regardless of conversation) response_storage .store_response(stored_response) .await .map_err(|e| format!("Failed to store response: {}", e))?; - // Step 6: Check if conversation is provided and validate it - let conv_id_opt = match &original_body.conversation { - Some(id) => { - let conv_id = ConversationId::from(id.as_str()); - // Verify conversation exists - if conversation_storage - .get_conversation(&conv_id) - .await - .map_err(|e| format!("Failed to get conversation: {}", e))? - .is_none() - { + // Check if conversation is provided and validate it exists + let conv_id_opt = if let Some(id) = &original_body.conversation { + let conv_id = ConversationId::from(id.as_str()); + match conversation_storage.get_conversation(&conv_id).await { + Ok(Some(_)) => Some(conv_id), + Ok(None) => { warn!(conversation_id = %conv_id.0, "Conversation not found, skipping item linking"); - None // Conversation doesn't exist, items already stored in response - } else { - Some(conv_id) + None } + Err(e) => return Err(format!("Failed to get conversation: {}", e)), } - None => None, // No conversation provided, items already stored in response + } else { + None }; - // Step 7: If conversation exists, link items to it + // If conversation exists, link items to it if let Some(conv_id) = conv_id_opt { link_items_to_conversation( &item_storage, @@ -1102,7 +93,6 @@ async fn persist_items_with_storages( response_id_str, ) .await?; - info!( conversation_id = %conv_id.0, response_id = %response_id.0, @@ -1200,16 +190,53 @@ fn extract_input_items(input: &ResponseInput) -> Result, String> { Ok(items) } -/// Extract ALL output items from response JSON -fn extract_output_items(response_json: &Value) -> Result, String> { - response_json - .get("output") - .and_then(|v| v.as_array()) - .cloned() - .ok_or_else(|| "No output array in response".to_string()) +/// Convert a JSON item to NewConversationItem +/// +/// For input items: function_call/function_call_output store whole item as content +/// For output items: message extracts content field, others store whole item +fn item_to_new_conversation_item( + item_value: &Value, + response_id: Option, + is_input: bool, +) -> NewConversationItem { + let item_type = item_value + .get("type") + .and_then(|v| v.as_str()) + .unwrap_or("message"); + + // Determine if we should store the whole item or just the content field + let store_whole_item = if is_input { + item_type == "function_call" || item_type == "function_call_output" + } else { + item_type != "message" + }; + + let content = if store_whole_item { + item_value.clone() + } else { + item_value.get("content").cloned().unwrap_or(json!([])) + }; + + NewConversationItem { + id: item_value + .get("id") + .and_then(|v| v.as_str()) + .map(ConversationItemId::from), + response_id, + item_type: item_type.to_string(), + role: item_value + .get("role") + .and_then(|v| v.as_str()) + .map(String::from), + content, + status: item_value + .get("status") + .and_then(|v| v.as_str()) + .map(String::from), + } } -/// Link ALL input and output items to a conversation +/// Link all input and output items to a conversation async fn link_items_to_conversation( item_storage: &Arc, conv_id: &ConversationId, @@ -1219,120 +246,15 @@ async fn link_items_to_conversation( ) -> Result<(), String> { let response_id_opt = Some(response_id.to_string()); - // Link ALL input items (no filtering by type) - for input_item_value in input_items { - let item_type = input_item_value - .get("type") - .and_then(|v| v.as_str()) - .unwrap_or("message"); - let role = input_item_value - .get("role") - .and_then(|v| v.as_str()) - .map(String::from); - - // For function_call and function_call_output, store the entire item as content - // For message types, extract just the content field - let content = if item_type == "function_call" || item_type == "function_call_output" { - input_item_value.clone() - } else { - input_item_value - .get("content") - .cloned() - .unwrap_or(json!([])) - }; - - let status = input_item_value - .get("status") - .and_then(|v| v.as_str()) - .map(String::from); - - // Extract the original item ID from input if present - let item_id = input_item_value - .get("id") - .and_then(|v| v.as_str()) - .map(ConversationItemId::from); - - let new_item = NewConversationItem { - id: item_id, // Preserve ID if present - response_id: response_id_opt.clone(), - item_type: item_type.to_string(), - role, - content, - status, - }; - + for item in input_items { + let new_item = item_to_new_conversation_item(item, response_id_opt.clone(), true); create_and_link_item(item_storage, Some(conv_id), new_item).await?; } - // Link ALL output items (no filtering by type) - // Store reasoning, function_tool_call, mcp_call, and any other types - for output_item_value in output_items { - let item_type = output_item_value - .get("type") - .and_then(|v| v.as_str()) - .unwrap_or("message"); - let role = output_item_value - .get("role") - .and_then(|v| v.as_str()) - .map(String::from); - let status = output_item_value - .get("status") - .and_then(|v| v.as_str()) - .map(String::from); - - // Extract the original item ID from the response - let item_id = output_item_value - .get("id") - .and_then(|v| v.as_str()) - .map(ConversationItemId::from); - - // For non-message types, store the entire item as content - // For message types, extract just the content field - let content = if item_type == "message" { - output_item_value - .get("content") - .cloned() - .unwrap_or(json!([])) - } else { - // For other types (reasoning, function_call, function_call_output, mcp_call, etc.) - // store the entire item structure - output_item_value.clone() - }; - - let new_item = NewConversationItem { - id: item_id, // Preserve ID if present - response_id: response_id_opt.clone(), - item_type: item_type.to_string(), - role, - content, - status, - }; - + for item in output_items { + let new_item = item_to_new_conversation_item(item, response_id_opt.clone(), false); create_and_link_item(item_storage, Some(conv_id), new_item).await?; } Ok(()) } - -// ============================================================================ -// Helper Functions -// ============================================================================ - -/// Convert conversation to JSON response -pub(crate) fn conversation_to_json(conversation: &Conversation) -> Value { - let mut response = json!({ - "id": conversation.id.0, - "object": "conversation", - "created_at": conversation.created_at.timestamp() - }); - - if let Some(metadata) = &conversation.metadata { - if !metadata.is_empty() { - if let Some(obj) = response.as_object_mut() { - obj.insert("metadata".to_string(), Value::Object(metadata.clone())); - } - } - } - - response -} diff --git a/sgl-router/src/routers/openai/router.rs b/sgl-router/src/routers/openai/router.rs index 0c4c6aba9b08..80a08e90ffc9 100644 --- a/sgl-router/src/routers/openai/router.rs +++ b/sgl-router/src/routers/openai/router.rs @@ -22,11 +22,7 @@ use super::{ ComponentRefs, PayloadState, RequestContext, ResponsesComponents, SharedComponents, WorkerSelection, }, - conversations::{ - create_conversation, create_conversation_items, delete_conversation, - delete_conversation_item, get_conversation, get_conversation_item, list_conversation_items, - persist_conversation_items, update_conversation, - }, + conversations::persist_conversation_items, mcp::{ ensure_request_mcp_client, execute_tool_loop, prepare_mcp_payload_for_streaming, McpLoopConfig, @@ -1111,8 +1107,16 @@ impl crate::routers::RouterTrait for OpenAIRouter { (StatusCode::NOT_IMPLEMENTED, "Rerank not supported").into_response() } + fn router_type(&self) -> &'static str { + "openai" + } + + // ============================================================================ + // Conversation API Methods - delegate to conversations module + // ============================================================================ + async fn create_conversation(&self, _headers: Option<&HeaderMap>, body: &Value) -> Response { - create_conversation( + super::conversations::create_conversation( &self.responses_components.conversation_storage, body.clone(), ) @@ -1124,7 +1128,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { _headers: Option<&HeaderMap>, conversation_id: &str, ) -> Response { - get_conversation( + super::conversations::get_conversation( &self.responses_components.conversation_storage, conversation_id, ) @@ -1137,7 +1141,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { conversation_id: &str, body: &Value, ) -> Response { - update_conversation( + super::conversations::update_conversation( &self.responses_components.conversation_storage, conversation_id, body.clone(), @@ -1150,7 +1154,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { _headers: Option<&HeaderMap>, conversation_id: &str, ) -> Response { - delete_conversation( + super::conversations::delete_conversation( &self.responses_components.conversation_storage, conversation_id, ) @@ -1162,25 +1166,16 @@ impl crate::routers::RouterTrait for OpenAIRouter { _headers: Option<&HeaderMap>, conversation_id: &str, limit: Option, - order: Option, - after: Option, + order: Option<&str>, + after: Option<&str>, ) -> Response { - let mut query_params = std::collections::HashMap::new(); - query_params.insert("limit".to_string(), limit.unwrap_or(100).to_string()); - if let Some(after_val) = after { - if !after_val.is_empty() { - query_params.insert("after".to_string(), after_val); - } - } - if let Some(order_val) = order { - query_params.insert("order".to_string(), order_val); - } - - list_conversation_items( + super::conversations::list_conversation_items( &self.responses_components.conversation_storage, &self.responses_components.conversation_item_storage, conversation_id, - query_params, + limit, + order, + after, ) .await } @@ -1191,7 +1186,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { conversation_id: &str, body: &Value, ) -> Response { - create_conversation_items( + super::conversations::create_conversation_items( &self.responses_components.conversation_storage, &self.responses_components.conversation_item_storage, conversation_id, @@ -1207,7 +1202,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { item_id: &str, include: Option>, ) -> Response { - get_conversation_item( + super::conversations::get_conversation_item( &self.responses_components.conversation_storage, &self.responses_components.conversation_item_storage, conversation_id, @@ -1223,7 +1218,7 @@ impl crate::routers::RouterTrait for OpenAIRouter { conversation_id: &str, item_id: &str, ) -> Response { - delete_conversation_item( + super::conversations::delete_conversation_item( &self.responses_components.conversation_storage, &self.responses_components.conversation_item_storage, conversation_id, @@ -1231,8 +1226,4 @@ impl crate::routers::RouterTrait for OpenAIRouter { ) .await } - - fn router_type(&self) -> &'static str { - "openai" - } } diff --git a/sgl-router/src/routers/router_manager.rs b/sgl-router/src/routers/router_manager.rs index 65dff42be315..bac979f886fa 100644 --- a/sgl-router/src/routers/router_manager.rs +++ b/sgl-router/src/routers/router_manager.rs @@ -576,7 +576,14 @@ impl RouterTrait for RouterManager { } } - // Conversations API delegates + fn router_type(&self) -> &'static str { + "manager" + } + + // ============================================================================ + // Conversation API Methods - delegate to selected router + // ============================================================================ + async fn create_conversation(&self, headers: Option<&HeaderMap>, body: &Value) -> Response { let router = self.select_router_for_request(headers, None); if let Some(router) = router { @@ -658,8 +665,8 @@ impl RouterTrait for RouterManager { headers: Option<&HeaderMap>, conversation_id: &str, limit: Option, - order: Option, - after: Option, + order: Option<&str>, + after: Option<&str>, ) -> Response { let router = self.select_router_for_request(headers, None); if let Some(router) = router { @@ -670,7 +677,7 @@ impl RouterTrait for RouterManager { ( StatusCode::NOT_FOUND, format!( - "No router available to list conversation items for '{}'", + "No router available to list items for conversation '{}'", conversation_id ), ) @@ -693,7 +700,7 @@ impl RouterTrait for RouterManager { ( StatusCode::NOT_FOUND, format!( - "No router available to create conversation items for '{}'", + "No router available to create items for conversation '{}'", conversation_id ), ) @@ -717,7 +724,7 @@ impl RouterTrait for RouterManager { ( StatusCode::NOT_FOUND, format!( - "No router available to get conversation item '{}' in '{}'", + "No router available to get item '{}' from conversation '{}'", item_id, conversation_id ), ) @@ -740,17 +747,13 @@ impl RouterTrait for RouterManager { ( StatusCode::NOT_FOUND, format!( - "No router available to delete conversation item '{}' in '{}'", + "No router available to delete item '{}' from conversation '{}'", item_id, conversation_id ), ) .into_response() } } - - fn router_type(&self) -> &'static str { - "manager" - } } impl std::fmt::Debug for RouterManager { diff --git a/sgl-router/src/server.rs b/sgl-router/src/server.rs index 79e6c87f4e49..bbf946aa3a9d 100644 --- a/sgl-router/src/server.rs +++ b/sgl-router/src/server.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -322,16 +323,22 @@ struct ListItemsQuery { async fn v1_conversations_list_items( State(state): State>, Path(conversation_id): Path, + headers: http::HeaderMap, Query(ListItemsQuery { limit, order, after, }): Query, - headers: http::HeaderMap, ) -> Response { state .router - .list_conversation_items(Some(&headers), &conversation_id, limit, order, after) + .list_conversation_items( + Some(&headers), + &conversation_id, + limit, + order.as_deref(), + after.as_deref(), + ) .await } @@ -356,8 +363,8 @@ async fn v1_conversations_create_items( async fn v1_conversations_get_item( State(state): State>, Path((conversation_id, item_id)): Path<(String, String)>, - Query(query): Query, headers: http::HeaderMap, + Query(query): Query, ) -> Response { state .router @@ -545,7 +552,7 @@ async fn get_worker(State(state): State>, Path(url): Path) tool_parser: None, chat_template: None, bootstrap_port: None, - metadata: std::collections::HashMap::new(), + metadata: HashMap::new(), job_status: Some(status), }; return Json(worker_info).into_response(); diff --git a/sgl-router/tests/responses_api_test.rs b/sgl-router/tests/responses_api_test.rs index 31285868a7a4..e58c124cb73c 100644 --- a/sgl-router/tests/responses_api_test.rs +++ b/sgl-router/tests/responses_api_test.rs @@ -14,7 +14,10 @@ use common::{ mock_mcp_server::MockMCPServer, mock_worker::{HealthStatus, MockWorker, MockWorkerConfig, WorkerType}, }; -use sgl_model_gateway::{config::RouterConfig, routers::RouterFactory}; +use sgl_model_gateway::{ + config::RouterConfig, + routers::{conversations, RouterFactory}, +}; #[tokio::test] async fn test_non_streaming_mcp_minimal_e2e_with_persistence() { @@ -234,11 +237,12 @@ async fn test_conversations_crud_basic() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create let create_body = serde_json::json!({ "metadata": { "project": "alpha" } }); - let create_resp = router.create_conversation(None, &create_body).await; + let create_resp = + conversations::create_conversation(&ctx.conversation_storage, create_body.clone()).await; assert_eq!(create_resp.status(), StatusCode::OK); let create_bytes = axum::body::to_bytes(create_resp.into_body(), usize::MAX) .await @@ -249,7 +253,7 @@ async fn test_conversations_crud_basic() { assert_eq!(create_json["object"], "conversation"); // Get - let get_resp = router.get_conversation(None, conv_id).await; + let get_resp = conversations::get_conversation(&ctx.conversation_storage, conv_id).await; assert_eq!(get_resp.status(), StatusCode::OK); let get_bytes = axum::body::to_bytes(get_resp.into_body(), usize::MAX) .await @@ -259,9 +263,9 @@ async fn test_conversations_crud_basic() { // Update (merge) let update_body = serde_json::json!({ "metadata": { "owner": "alice" } }); - let upd_resp = router - .update_conversation(None, conv_id, &update_body) - .await; + let upd_resp = + conversations::update_conversation(&ctx.conversation_storage, conv_id, update_body.clone()) + .await; assert_eq!(upd_resp.status(), StatusCode::OK); let upd_bytes = axum::body::to_bytes(upd_resp.into_body(), usize::MAX) .await @@ -271,7 +275,7 @@ async fn test_conversations_crud_basic() { assert_eq!(upd_json["metadata"]["owner"], serde_json::json!("alice")); // Delete - let del_resp = router.delete_conversation(None, conv_id).await; + let del_resp = conversations::delete_conversation(&ctx.conversation_storage, conv_id).await; assert_eq!(del_resp.status(), StatusCode::OK); let del_bytes = axum::body::to_bytes(del_resp.into_body(), usize::MAX) .await @@ -280,7 +284,7 @@ async fn test_conversations_crud_basic() { assert_eq!(del_json["deleted"], serde_json::json!(true)); // Get again -> 404 - let not_found = router.get_conversation(None, conv_id).await; + let not_found = conversations::get_conversation(&ctx.conversation_storage, conv_id).await; assert_eq!(not_found.status(), StatusCode::NOT_FOUND); } @@ -1236,11 +1240,12 @@ async fn test_conversation_items_create_and_get() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create conversation let create_conv = serde_json::json!({}); - let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_resp = + conversations::create_conversation(&ctx.conversation_storage, create_conv).await; assert_eq!(conv_resp.status(), StatusCode::OK); let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) .await @@ -1264,9 +1269,13 @@ async fn test_conversation_items_create_and_get() { ] }); - let items_resp = router - .create_conversation_items(None, conv_id, &create_items) - .await; + let items_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + create_items, + ) + .await; assert_eq!(items_resp.status(), StatusCode::OK); let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) .await @@ -1279,9 +1288,14 @@ async fn test_conversation_items_create_and_get() { // Get first item let item_id = items_json["data"][0]["id"].as_str().unwrap(); - let get_resp = router - .get_conversation_item(None, conv_id, item_id, None) - .await; + let get_resp = conversations::get_conversation_item( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + item_id, + None, + ) + .await; assert_eq!(get_resp.status(), StatusCode::OK); let get_bytes = axum::body::to_bytes(get_resp.into_body(), usize::MAX) .await @@ -1312,11 +1326,12 @@ async fn test_conversation_items_delete() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create conversation let create_conv = serde_json::json!({}); - let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_resp = + conversations::create_conversation(&ctx.conversation_storage, create_conv).await; let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1334,9 +1349,13 @@ async fn test_conversation_items_delete() { ] }); - let items_resp = router - .create_conversation_items(None, conv_id, &create_items) - .await; + let items_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + create_items, + ) + .await; let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1344,9 +1363,15 @@ async fn test_conversation_items_delete() { let item_id = items_json["data"][0]["id"].as_str().unwrap(); // List items (should have 1) - let list_resp = router - .list_conversation_items(None, conv_id, None, None, None) - .await; + let list_resp = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + None, + None, + None, + ) + .await; let list_bytes = axum::body::to_bytes(list_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1354,15 +1379,25 @@ async fn test_conversation_items_delete() { assert_eq!(list_json["data"].as_array().unwrap().len(), 1); // Delete item - let del_resp = router - .delete_conversation_item(None, conv_id, item_id) - .await; + let del_resp = conversations::delete_conversation_item( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + item_id, + ) + .await; assert_eq!(del_resp.status(), StatusCode::OK); // List items again (should have 0) - let list_resp2 = router - .list_conversation_items(None, conv_id, None, None, None) - .await; + let list_resp2 = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + None, + None, + None, + ) + .await; let list_bytes2 = axum::body::to_bytes(list_resp2.into_body(), usize::MAX) .await .unwrap(); @@ -1370,9 +1405,14 @@ async fn test_conversation_items_delete() { assert_eq!(list_json2["data"].as_array().unwrap().len(), 0); // Item should NOT be gettable from this conversation after deletion (link removed) - let get_resp = router - .get_conversation_item(None, conv_id, item_id, None) - .await; + let get_resp = conversations::get_conversation_item( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + item_id, + None, + ) + .await; assert_eq!(get_resp.status(), StatusCode::NOT_FOUND); } @@ -1394,11 +1434,12 @@ async fn test_conversation_items_max_limit() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create conversation let create_conv = serde_json::json!({}); - let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_resp = + conversations::create_conversation(&ctx.conversation_storage, create_conv).await; let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1416,9 +1457,13 @@ async fn test_conversation_items_max_limit() { } let create_items = serde_json::json!({ "items": items }); - let items_resp = router - .create_conversation_items(None, conv_id, &create_items) - .await; + let items_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + create_items, + ) + .await; assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST); let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) @@ -1446,11 +1491,12 @@ async fn test_conversation_items_unsupported_type() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create conversation let create_conv = serde_json::json!({}); - let conv_resp = router.create_conversation(None, &create_conv).await; + let conv_resp = + conversations::create_conversation(&ctx.conversation_storage, create_conv).await; let conv_bytes = axum::body::to_bytes(conv_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1467,9 +1513,13 @@ async fn test_conversation_items_unsupported_type() { ] }); - let items_resp = router - .create_conversation_items(None, conv_id, &create_items) - .await; + let items_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_id, + create_items, + ) + .await; assert_eq!(items_resp.status(), StatusCode::BAD_REQUEST); let items_bytes = axum::body::to_bytes(items_resp.into_body(), usize::MAX) @@ -1497,21 +1547,19 @@ async fn test_conversation_items_multi_conversation_sharing() { .build_unchecked(); let ctx = common::create_test_context(router_cfg).await; - let router = RouterFactory::create_router(&ctx).await.expect("router"); + let _router = RouterFactory::create_router(&ctx).await.expect("router"); // Create two conversations - let conv_a_resp = router - .create_conversation(None, &serde_json::json!({})) - .await; + let conv_a_resp = + conversations::create_conversation(&ctx.conversation_storage, serde_json::json!({})).await; let conv_a_bytes = axum::body::to_bytes(conv_a_resp.into_body(), usize::MAX) .await .unwrap(); let conv_a_json: serde_json::Value = serde_json::from_slice(&conv_a_bytes).unwrap(); let conv_a_id = conv_a_json["id"].as_str().unwrap(); - let conv_b_resp = router - .create_conversation(None, &serde_json::json!({})) - .await; + let conv_b_resp = + conversations::create_conversation(&ctx.conversation_storage, serde_json::json!({})).await; let conv_b_bytes = axum::body::to_bytes(conv_b_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1529,9 +1577,13 @@ async fn test_conversation_items_multi_conversation_sharing() { ] }); - let items_a_resp = router - .create_conversation_items(None, conv_a_id, &create_items) - .await; + let items_a_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_a_id, + create_items, + ) + .await; let items_a_bytes = axum::body::to_bytes(items_a_resp.into_body(), usize::MAX) .await .unwrap(); @@ -1548,24 +1600,40 @@ async fn test_conversation_items_multi_conversation_sharing() { ] }); - let items_b_resp = router - .create_conversation_items(None, conv_b_id, &reference_items) - .await; + let items_b_resp = conversations::create_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_b_id, + reference_items, + ) + .await; assert_eq!(items_b_resp.status(), StatusCode::OK); // Verify item appears in both conversations - let list_a = router - .list_conversation_items(None, conv_a_id, None, None, None) - .await; + let list_a = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_a_id, + None, + None, + None, + ) + .await; let list_a_bytes = axum::body::to_bytes(list_a.into_body(), usize::MAX) .await .unwrap(); let list_a_json: serde_json::Value = serde_json::from_slice(&list_a_bytes).unwrap(); assert_eq!(list_a_json["data"].as_array().unwrap().len(), 1); - let list_b = router - .list_conversation_items(None, conv_b_id, None, None, None) - .await; + let list_b = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_b_id, + None, + None, + None, + ) + .await; let list_b_bytes = axum::body::to_bytes(list_b.into_body(), usize::MAX) .await .unwrap(); @@ -1573,14 +1641,24 @@ async fn test_conversation_items_multi_conversation_sharing() { assert_eq!(list_b_json["data"].as_array().unwrap().len(), 1); // Delete from conversation A - router - .delete_conversation_item(None, conv_a_id, item_id) - .await; + conversations::delete_conversation_item( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_a_id, + item_id, + ) + .await; // Should be removed from A - let list_a2 = router - .list_conversation_items(None, conv_a_id, None, None, None) - .await; + let list_a2 = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_a_id, + None, + None, + None, + ) + .await; let list_a2_bytes = axum::body::to_bytes(list_a2.into_body(), usize::MAX) .await .unwrap(); @@ -1588,9 +1666,15 @@ async fn test_conversation_items_multi_conversation_sharing() { assert_eq!(list_a2_json["data"].as_array().unwrap().len(), 0); // Should still exist in B (soft delete) - let list_b2 = router - .list_conversation_items(None, conv_b_id, None, None, None) - .await; + let list_b2 = conversations::list_conversation_items( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_b_id, + None, + None, + None, + ) + .await; let list_b2_bytes = axum::body::to_bytes(list_b2.into_body(), usize::MAX) .await .unwrap(); @@ -1598,8 +1682,13 @@ async fn test_conversation_items_multi_conversation_sharing() { assert_eq!(list_b2_json["data"].as_array().unwrap().len(), 1); // Item should still be directly gettable - let get_resp = router - .get_conversation_item(None, conv_b_id, item_id, None) - .await; + let get_resp = conversations::get_conversation_item( + &ctx.conversation_storage, + &ctx.conversation_item_storage, + conv_b_id, + item_id, + None, + ) + .await; assert_eq!(get_resp.status(), StatusCode::OK); }