diff --git a/core/providers/anthropic.go b/core/providers/anthropic.go index b7c34a2b1e..bce0bedb25 100644 --- a/core/providers/anthropic.go +++ b/core/providers/anthropic.go @@ -70,7 +70,7 @@ type AnthropicStreamEvent struct { Index *int `json:"index,omitempty"` ContentBlock *AnthropicContentBlock `json:"content_block,omitempty"` Delta *AnthropicDelta `json:"delta,omitempty"` - Usage *schemas.LLMUsage `json:"usage,omitempty"` + Usage *AnthropicUsage `json:"usage,omitempty"` Error *AnthropicStreamError `json:"error,omitempty"` } @@ -84,7 +84,35 @@ type AnthropicStreamMessage struct { Model string `json:"model"` StopReason *string `json:"stop_reason"` StopSequence *string `json:"stop_sequence"` - Usage *schemas.LLMUsage `json:"usage"` + Usage *AnthropicUsage `json:"usage"` +} + +// AnthropicUsage represents usage information in Anthropic format +type AnthropicUsage struct { + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` + CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` +} + +func (u *AnthropicUsage) ToLLMUsage() *schemas.LLMUsage { + if u == nil { + return nil + } + + llmUsage := &schemas.LLMUsage{ + PromptTokens: u.InputTokens, + CompletionTokens: u.OutputTokens, + TotalTokens: u.InputTokens + u.OutputTokens, + } + + if u.CacheReadInputTokens > 0 { + llmUsage.TokenDetails = &schemas.TokenDetails{ + CachedTokens: u.CacheReadInputTokens, + } + } + + return llmUsage } // AnthropicContentBlock represents a content block in Anthropic responses. @@ -487,6 +515,10 @@ func prepareAnthropicChatRequest(messages []schemas.BifrostMessage, params *sche "tool_use_id": *msg.ToolMessage.ToolCallID, } + if msg.ToolMessage.IsError != nil { + toolCallResult["is_error"] = *msg.ToolMessage.IsError + } + var toolCallResultContent []map[string]interface{} if msg.Content.ContentStr != nil { @@ -871,6 +903,7 @@ func handleAnthropicStreaming( // Track minimal state needed for response format var messageID string var modelName string + var usage *AnthropicUsage // Track SSE event parsing state var eventType string @@ -899,6 +932,8 @@ func handleAnthropicStreaming( continue } + // logger.Debug(fmt.Sprintf("Received event: %s, %s", eventType, eventData)) + // Handle different event types switch eventType { case "message_start": @@ -910,6 +945,7 @@ func handleAnthropicStreaming( if event.Message != nil { messageID = event.Message.ID modelName = event.Message.Model + usage = event.Message.Usage } case "content_block_start": @@ -948,6 +984,7 @@ func handleAnthropicStreaming( }, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, @@ -986,6 +1023,7 @@ func handleAnthropicStreaming( }, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, @@ -1027,6 +1065,7 @@ func handleAnthropicStreaming( }, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, @@ -1065,6 +1104,7 @@ func handleAnthropicStreaming( }, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, @@ -1096,6 +1136,7 @@ func handleAnthropicStreaming( }, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, @@ -1128,6 +1169,9 @@ func handleAnthropicStreaming( } // Handle delta changes to the top-level message + if event.Usage != nil && usage != nil { + usage.OutputTokens = event.Usage.OutputTokens + } // Send usage information immediately if present if event.Usage != nil { @@ -1135,7 +1179,6 @@ func handleAnthropicStreaming( ID: messageID, Object: "chat.completion.chunk", Model: modelName, - Usage: event.Usage, Choices: []schemas.BifrostResponseChoice{ { Index: 0, @@ -1145,6 +1188,7 @@ func handleAnthropicStreaming( FinishReason: event.Delta.StopReason, }, }, + Usage: usage.ToLLMUsage(), ExtraFields: schemas.BifrostResponseExtraFields{ Provider: providerType, }, diff --git a/core/providers/openai.go b/core/providers/openai.go index 66cda30dea..6eb1ace04d 100644 --- a/core/providers/openai.go +++ b/core/providers/openai.go @@ -222,12 +222,23 @@ func prepareOpenAIChatRequest(messages []schemas.BifrostMessage, params *schemas for _, msg := range messages { if msg.Role == schemas.ModelChatMessageRoleAssistant { assistantMessage := map[string]interface{}{ - "role": msg.Role, - "content": msg.Content, + "role": msg.Role, } if msg.AssistantMessage != nil && msg.AssistantMessage.ToolCalls != nil { assistantMessage["tool_calls"] = *msg.AssistantMessage.ToolCalls } + if msg.Content.ContentStr != nil { + assistantMessage["content"] = *msg.Content.ContentStr + } else if msg.Content.ContentBlocks != nil && len(*msg.Content.ContentBlocks) > 0 { + var sb strings.Builder + for _, block := range *msg.Content.ContentBlocks { + if block.Text != nil && *block.Text != "" { + sb.WriteString(*block.Text) + sb.WriteString(" ") + } + } + assistantMessage["content"] = sb.String() + } formattedMessages = append(formattedMessages, assistantMessage) } else { message := map[string]interface{}{ @@ -250,6 +261,24 @@ func prepareOpenAIChatRequest(messages []schemas.BifrostMessage, params *schemas if msg.ToolMessage != nil && msg.ToolMessage.ToolCallID != nil { message["tool_call_id"] = *msg.ToolMessage.ToolCallID + if msg.IsError != nil { + message["is_error"] = *msg.IsError + } + + content := message["content"] + if contentBlocks, ok := content.([]schemas.ContentBlock); ok { + var sb strings.Builder + for _, block := range contentBlocks { + if block.Text != nil && *block.Text != "" { + sb.WriteString(*block.Text) + sb.WriteString(" ") + } else if block.ImageURL != nil { + sb.WriteString(block.ImageURL.URL) + sb.WriteString(" ") + } + } + message["content"] = sb.String() + } } formattedMessages = append(formattedMessages, message) @@ -565,6 +594,9 @@ func handleOpenAIStreaming( // Handle usage-only chunks (when stream_options include_usage is true) if len(response.Choices) == 0 && response.Usage != nil { + // Empty choices array. + response.Choices = []schemas.BifrostResponseChoice{} + // This is a usage information chunk at the end of stream if params != nil { response.ExtraFields.Params = *params @@ -590,9 +622,7 @@ func handleOpenAIStreaming( response.ExtraFields.Provider = providerType processAndSendResponse(ctx, postHookRunner, &response, responseChan) - - // End stream processing after finish reason - break + continue } // Handle regular content chunks @@ -603,6 +633,7 @@ func handleOpenAIStreaming( response.ExtraFields.Provider = providerType processAndSendResponse(ctx, postHookRunner, &response, responseChan) + continue } } diff --git a/core/schemas/bifrost.go b/core/schemas/bifrost.go index 35bd09f470..b0cc3cea36 100644 --- a/core/schemas/bifrost.go +++ b/core/schemas/bifrost.go @@ -10,6 +10,11 @@ const ( DefaultInitialPoolSize = 100 ) +// StreamOptions represents the options for streaming requests. +type StreamOptions struct { + IncludeUsage bool `json:"include_usage"` +} + // BifrostConfig represents the configuration for initializing a Bifrost instance. // It contains the necessary components for setting up the system including account details, // plugins, logging, and initial pool size. @@ -161,19 +166,20 @@ type Fallback struct { // your request to the model. Bifrost follows a standard set of parameters which // mapped to the provider's parameters. type ModelParameters struct { - ToolChoice *ToolChoice `json:"tool_choice,omitempty"` // Whether to call a tool - Tools *[]Tool `json:"tools,omitempty"` // Tools to use - Temperature *float64 `json:"temperature,omitempty"` // Controls randomness in the output - TopP *float64 `json:"top_p,omitempty"` // Controls diversity via nucleus sampling - TopK *int `json:"top_k,omitempty"` // Controls diversity via top-k sampling - MaxTokens *int `json:"max_tokens,omitempty"` // Maximum number of tokens to generate - StopSequences *[]string `json:"stop_sequences,omitempty"` // Sequences that stop generation - PresencePenalty *float64 `json:"presence_penalty,omitempty"` // Penalizes repeated tokens - FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` // Penalizes frequent tokens - ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"` // Enables parallel tool calls - EncodingFormat *string `json:"encoding_format,omitempty"` // Format for embedding output (e.g., "float", "base64") - Dimensions *int `json:"dimensions,omitempty"` // Number of dimensions for embedding output - User *string `json:"user,omitempty"` // User identifier for tracking + ToolChoice *ToolChoice `json:"tool_choice,omitempty"` // Whether to call a tool + Tools *[]Tool `json:"tools,omitempty"` // Tools to use + Temperature *float64 `json:"temperature,omitempty"` // Controls randomness in the output + TopP *float64 `json:"top_p,omitempty"` // Controls diversity via nucleus sampling + TopK *int `json:"top_k,omitempty"` // Controls diversity via top-k sampling + MaxTokens *int `json:"max_tokens,omitempty"` // Maximum number of tokens to generate + StopSequences *[]string `json:"stop_sequences,omitempty"` // Sequences that stop generation + PresencePenalty *float64 `json:"presence_penalty,omitempty"` // Penalizes repeated tokens + FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` // Penalizes frequent tokens + ParallelToolCalls *bool `json:"parallel_tool_calls,omitempty"` // Enables parallel tool calls + EncodingFormat *string `json:"encoding_format,omitempty"` // Format for embedding output (e.g., "float", "base64") + Dimensions *int `json:"dimensions,omitempty"` // Number of dimensions for embedding output + User *string `json:"user,omitempty"` // User identifier for tracking + StreamOptions *StreamOptions `json:"stream_options,omitempty"` // Stream options for streaming requests // Dynamic parameters that can be provider-specific, they are directly // added to the request as is. ExtraParams map[string]interface{} `json:"-"` @@ -351,6 +357,7 @@ type ContentBlock struct { // ToolMessage represents a message from a tool type ToolMessage struct { ToolCallID *string `json:"tool_call_id,omitempty"` + IsError *bool `json:"is_error,omitempty"` } // AssistantMessage represents a message from an assistant @@ -371,18 +378,41 @@ type ImageURLStruct struct { // BifrostResponse represents the complete result from any bifrost request. type BifrostResponse struct { - ID string `json:"id,omitempty"` - Object string `json:"object,omitempty"` // text.completion, chat.completion, or embedding - Choices []BifrostResponseChoice `json:"choices,omitempty"` - Embedding [][]float32 `json:"data,omitempty"` // Maps to "data" field in provider responses (e.g., OpenAI embedding format) - Speech *BifrostSpeech `json:"speech,omitempty"` // Maps to "speech" field in provider responses (e.g., OpenAI speech format) - Transcribe *BifrostTranscribe `json:"transcribe,omitempty"` // Maps to "transcribe" field in provider responses (e.g., OpenAI transcription format) - Model string `json:"model,omitempty"` - Created int `json:"created,omitempty"` // The Unix timestamp (in seconds). - ServiceTier *string `json:"service_tier,omitempty"` - SystemFingerprint *string `json:"system_fingerprint,omitempty"` - Usage *LLMUsage `json:"usage,omitempty"` - ExtraFields BifrostResponseExtraFields `json:"extra_fields"` + ID string `json:"id,omitempty"` + Object string `json:"object,omitempty"` // text.completion, chat.completion, or embedding + Choices []BifrostResponseChoice `json:"choices,omitempty"` + Embedding [][]float32 `json:"data,omitempty"` // Maps to "data" field in provider responses (e.g., OpenAI embedding format) + Speech *BifrostSpeech `json:"speech,omitempty"` // Maps to "speech" field in provider responses (e.g., OpenAI speech format) + Transcribe *BifrostTranscribe `json:"transcribe,omitempty"` // Maps to "transcribe" field in provider responses (e.g., OpenAI transcription format) + Model string `json:"model,omitempty"` + Created int `json:"created,omitempty"` // The Unix timestamp (in seconds). + ServiceTier *string `json:"service_tier,omitempty"` + SystemFingerprint *string `json:"system_fingerprint,omitempty"` + Usage *LLMUsage `json:"usage,omitempty"` + PromptFilterResults *[]PromptFilterResult `json:"prompt_filter_results,omitempty"` // AzureĀ OpenAI Service + ExtraFields BifrostResponseExtraFields `json:"extra_fields"` +} + +// FilterResult represents the result of a content filter. +type FilterResult struct { + Filtered bool `json:"filtered"` + Severity bool `json:"severity"` +} + +// ContentFilterResult represents the result of a content filter. +type ContentFilterResult struct { + HateSpeech FilterResult `json:"hate_speech,omitempty"` + SelfHarm FilterResult `json:"self_harm,omitempty"` + Sexual FilterResult `json:"sexual,omitempty"` + Violence FilterResult `json:"violence,omitempty"` + Jailbreak FilterResult `json:"jailbreak,omitempty"` + Profanity FilterResult `json:"profanity,omitempty"` +} + +// PromptFilterResult represents the result of a prompt filter. +type PromptFilterResult struct { + PromptIndex int `json:"prompt_index"` + ContentFilterResults *ContentFilterResult `json:"content_filter_results"` } // LLMUsage represents token usage information @@ -394,6 +424,36 @@ type LLMUsage struct { CompletionTokensDetails *CompletionTokensDetails `json:"completion_tokens_details,omitempty"` } +func (u *LLMUsage) Clone() *LLMUsage { + if u == nil { + return nil + } + + ret := &LLMUsage{ + PromptTokens: u.PromptTokens, + CompletionTokens: u.CompletionTokens, + TotalTokens: u.TotalTokens, + } + + if u.TokenDetails != nil { + ret.TokenDetails = &TokenDetails{ + CachedTokens: u.TokenDetails.CachedTokens, + AudioTokens: u.TokenDetails.AudioTokens, + } + } + + if u.CompletionTokensDetails != nil { + ret.CompletionTokensDetails = &CompletionTokensDetails{ + ReasoningTokens: u.CompletionTokensDetails.ReasoningTokens, + AudioTokens: u.CompletionTokensDetails.AudioTokens, + AcceptedPredictionTokens: u.CompletionTokensDetails.AcceptedPredictionTokens, + RejectedPredictionTokens: u.CompletionTokensDetails.RejectedPredictionTokens, + } + } + + return ret +} + type AudioLLMUsage struct { InputTokens int `json:"input_tokens"` InputTokensDetails *AudioTokenDetails `json:"input_tokens_details,omitempty"` @@ -494,8 +554,9 @@ type Annotation struct { // IMPORTANT: Only one of BifrostNonStreamResponseChoice or BifrostStreamResponseChoice // should be non-nil at a time. type BifrostResponseChoice struct { - Index int `json:"index"` - FinishReason *string `json:"finish_reason,omitempty"` + Index int `json:"index"` + FinishReason *string `json:"finish_reason,omitempty"` + ContentFilterResults *ContentFilterResult `json:"content_filter_results,omitempty"` // AzureĀ OpenAI Service or DeepSeek *BifrostNonStreamResponseChoice *BifrostStreamResponseChoice diff --git a/transports/bifrost-http/integrations/anthropic/router.go b/transports/bifrost-http/integrations/anthropic/router.go index e7d13ca8ed..41ec44434b 100644 --- a/transports/bifrost-http/integrations/anthropic/router.go +++ b/transports/bifrost-http/integrations/anthropic/router.go @@ -35,8 +35,8 @@ func NewAnthropicRouter(client *bifrost.Bifrost) *AnthropicRouter { return DeriveAnthropicErrorFromBifrostError(err) }, StreamConfig: &integrations.StreamConfig{ - ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { - return DeriveAnthropicStreamFromBifrostResponse(resp), nil + ResponseConverter: func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { + return DeriveAnthropicStreamFromBifrostResponse(resp, streamIndex), nil }, ErrorConverter: func(err *schemas.BifrostError) interface{} { return DeriveAnthropicStreamFromBifrostError(err) diff --git a/transports/bifrost-http/integrations/anthropic/types.go b/transports/bifrost-http/integrations/anthropic/types.go index 0833a3824e..3185b69275 100644 --- a/transports/bifrost-http/integrations/anthropic/types.go +++ b/transports/bifrost-http/integrations/anthropic/types.go @@ -20,6 +20,7 @@ type AnthropicContentBlock struct { Name *string `json:"name,omitempty"` // For tool_use content Input interface{} `json:"input,omitempty"` // For tool_use content Content AnthropicContent `json:"content,omitempty"` // For tool_result content + IsError *bool `json:"is_error,omitempty"` // For tool_result content Source *AnthropicImageSource `json:"source,omitempty"` // For image content } @@ -62,17 +63,18 @@ type AnthropicToolChoice struct { // AnthropicMessageRequest represents an Anthropic messages API request type AnthropicMessageRequest struct { - Model string `json:"model"` - MaxTokens int `json:"max_tokens"` - Messages []AnthropicMessage `json:"messages"` - System *AnthropicContent `json:"system,omitempty"` - Temperature *float64 `json:"temperature,omitempty"` - TopP *float64 `json:"top_p,omitempty"` - TopK *int `json:"top_k,omitempty"` - StopSequences *[]string `json:"stop_sequences,omitempty"` - Stream *bool `json:"stream,omitempty"` - Tools *[]AnthropicTool `json:"tools,omitempty"` - ToolChoice *AnthropicToolChoice `json:"tool_choice,omitempty"` + Model string `json:"model"` + MaxTokens int `json:"max_tokens"` + Messages []AnthropicMessage `json:"messages"` + System *AnthropicContent `json:"system,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + TopK *int `json:"top_k,omitempty"` + StopSequences *[]string `json:"stop_sequences,omitempty"` + Stream *bool `json:"stream,omitempty"` + StreamOptions *schemas.StreamOptions `json:"stream_options,omitempty"` + Tools *[]AnthropicTool `json:"tools,omitempty"` + ToolChoice *AnthropicToolChoice `json:"tool_choice,omitempty"` } // IsStreamingRequested implements the StreamingRequest interface @@ -94,8 +96,10 @@ type AnthropicMessageResponse struct { // AnthropicUsage represents usage information in Anthropic format type AnthropicUsage struct { - InputTokens int `json:"input_tokens"` - OutputTokens int `json:"output_tokens"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens"` + CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"` + CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"` } // AnthropicMessageError represents an Anthropic messages API error response @@ -123,6 +127,14 @@ type AnthropicStreamResponse struct { Usage *AnthropicUsage `json:"usage,omitempty"` } +func (s *AnthropicStreamResponse) ToSSE() string { + jsonData, err := json.Marshal(s) + if err != nil { + return "event: error\ndata: {\"type\": \"error\", \"error\": {\"type\": \"internal_error\", \"message\": \"Failed to marshal stream response\"}}\n\n" + } + return fmt.Sprintf("event: %s\ndata: %s\n\n", s.Type, string(jsonData)) +} + // AnthropicStreamMessage represents the message structure in streaming events type AnthropicStreamMessage struct { ID string `json:"id"` @@ -137,7 +149,7 @@ type AnthropicStreamMessage struct { // AnthropicStreamDelta represents the incremental content in a streaming chunk type AnthropicStreamDelta struct { - Type string `json:"type"` + Type string `json:"type,omitempty"` Text *string `json:"text,omitempty"` Thinking *string `json:"thinking,omitempty"` PartialJSON *string `json:"partial_json,omitempty"` @@ -235,6 +247,8 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ var toolCalls []schemas.ToolCall var contentBlocks []schemas.ContentBlock + skipAppendMessage := false + for _, content := range *msg.Content.ContentBlocks { switch content.Type { case "text": @@ -278,48 +292,69 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ toolCalls = append(toolCalls, tc) } case "tool_result": - if content.ToolUseID != nil { - bifrostMsg.ToolMessage = &schemas.ToolMessage{ - ToolCallID: content.ToolUseID, - } - if content.Content.ContentStr != nil { - contentBlocks = append(contentBlocks, schemas.ContentBlock{ - Type: schemas.ContentBlockTypeText, - Text: content.Content.ContentStr, - }) - } else if content.Content.ContentBlocks != nil { - for _, block := range *content.Content.ContentBlocks { - if block.Text != nil { - contentBlocks = append(contentBlocks, schemas.ContentBlock{ - Type: schemas.ContentBlockTypeText, - Text: block.Text, - }) - } else if block.Source != nil { - contentBlocks = append(contentBlocks, schemas.ContentBlock{ - Type: schemas.ContentBlockTypeImage, - ImageURL: &schemas.ImageURLStruct{ - URL: func() string { - if block.Source.Data != nil { - mime := "image/png" - if block.Source.MediaType != nil && *block.Source.MediaType != "" { - mime = *block.Source.MediaType - } - return "data:" + mime + ";base64," + *block.Source.Data - } - if block.Source.URL != nil { - return *block.Source.URL + if content.ToolUseID == nil || *content.ToolUseID == "" { + continue + } + + skipAppendMessage = true + + bifrostMsg.Role = schemas.ModelChatMessageRoleTool + bifrostMsg.ToolMessage = &schemas.ToolMessage{ + ToolCallID: content.ToolUseID, + IsError: content.IsError, + } + if content.Content.ContentStr != nil { + contentBlocks = append(contentBlocks, schemas.ContentBlock{ + Type: schemas.ContentBlockTypeText, + Text: content.Content.ContentStr, + }) + } else if content.Content.ContentBlocks != nil { + for _, block := range *content.Content.ContentBlocks { + if block.Text != nil { + contentBlocks = append(contentBlocks, schemas.ContentBlock{ + Type: schemas.ContentBlockTypeText, + Text: block.Text, + }) + } else if block.Source != nil { + contentBlocks = append(contentBlocks, schemas.ContentBlock{ + Type: schemas.ContentBlockTypeImage, + ImageURL: &schemas.ImageURLStruct{ + URL: func() string { + if block.Source.Data != nil { + mime := "image/png" + if block.Source.MediaType != nil && *block.Source.MediaType != "" { + mime = *block.Source.MediaType } - return "" - }()}, - }) - } + return "data:" + mime + ";base64," + *block.Source.Data + } + if block.Source.URL != nil { + return *block.Source.URL + } + return "" + }()}, + }) } } - bifrostMsg.Role = schemas.ModelChatMessageRoleTool } + + if len(contentBlocks) > 0 { + blocks := make([]schemas.ContentBlock, len(contentBlocks)) + copy(blocks, contentBlocks) + bifrostMsg.Content = schemas.MessageContent{ + ContentBlocks: &blocks, + } + messages = append(messages, bifrostMsg) + bifrostMsg = schemas.BifrostMessage{} + contentBlocks = contentBlocks[:0] + } + continue } } + if skipAppendMessage { + continue + } + // Concatenate all text contents if len(contentBlocks) > 0 { bifrostMsg.Content = schemas.MessageContent{ @@ -357,6 +392,9 @@ func (r *AnthropicMessageRequest) ConvertToBifrostRequest() *schemas.BifrostRequ if r.StopSequences != nil { params.StopSequences = r.StopSequences } + if r.StreamOptions != nil { + params.StreamOptions = r.StreamOptions + } bifrostReq.Params = params } @@ -447,6 +485,9 @@ func DeriveAnthropicFromBifrostResponse(bifrostResp *schemas.BifrostResponse) *A InputTokens: bifrostResp.Usage.PromptTokens, OutputTokens: bifrostResp.Usage.CompletionTokens, } + if bifrostResp.Usage.TokenDetails != nil { + anthropicResp.Usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens + } } // Convert choices to content @@ -518,12 +559,12 @@ func DeriveAnthropicFromBifrostResponse(bifrostResp *schemas.BifrostResponse) *A } // DeriveAnthropicStreamFromBifrostResponse converts a Bifrost streaming response to Anthropic SSE string format -func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostResponse) string { +func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostResponse, streamIndex int) []*AnthropicStreamResponse { if bifrostResp == nil { - return "" + return nil } - streamResp := &AnthropicStreamResponse{} + var streamRespList []*AnthropicStreamResponse // Handle different streaming event types based on the response content if len(bifrostResp.Choices) > 0 { @@ -533,18 +574,63 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon if choice.BifrostStreamResponseChoice != nil { delta := choice.BifrostStreamResponseChoice.Delta + // Handle message start event + if streamIndex == 0 { + streamResp := &AnthropicStreamResponse{} + var usage *AnthropicUsage + + if bifrostResp.Usage != nil { + usage = &AnthropicUsage{ + InputTokens: bifrostResp.Usage.PromptTokens, + OutputTokens: bifrostResp.Usage.CompletionTokens, + CacheCreationInputTokens: 0, + } + if bifrostResp.Usage.TokenDetails != nil { + usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens + } + } else { + // Default to 1 token for input and output, e.g. for DeepSeek api. + // Return the actual usage in the final message delta. + usage = &AnthropicUsage{ + InputTokens: 1, + OutputTokens: 1, + } + } + streamResp.Type = "message_start" + streamResp.Message = &AnthropicStreamMessage{ + ID: bifrostResp.ID, + Type: "message", + Role: "assistant", + Model: bifrostResp.Model, + Usage: usage, + Content: []AnthropicContentBlock{}, + } + + streamRespList = append(streamRespList, streamResp) + } + + streamResp := &AnthropicStreamResponse{ + Index: &choice.Index, + } + // Handle text content deltas if delta.Content != nil { - streamResp.Type = "content_block_delta" - streamResp.Index = &choice.Index - streamResp.Delta = &AnthropicStreamDelta{ - Type: "text_delta", - Text: delta.Content, + if streamIndex == 0 { + streamResp.Type = "content_block_start" + streamResp.ContentBlock = &AnthropicContentBlock{ + Type: "text", + Text: delta.Content, + } + } else { + streamResp.Type = "content_block_delta" + streamResp.Delta = &AnthropicStreamDelta{ + Type: "text_delta", + Text: delta.Content, + } } } else if delta.Thought != nil { // Handle thinking content deltas streamResp.Type = "content_block_delta" - streamResp.Index = &choice.Index streamResp.Delta = &AnthropicStreamDelta{ Type: "thinking_delta", Thinking: delta.Thought, @@ -571,16 +657,39 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon PartialJSON: &toolCall.Function.Arguments, } } - } else if choice.FinishReason != nil && *choice.FinishReason != "" { - // Handle finish reason - streamResp.Type = "message_delta" - streamResp.Delta = &AnthropicStreamDelta{ - Type: "message_delta", - StopReason: choice.FinishReason, + } + + // Handle finish reason + if choice.FinishReason != nil && *choice.FinishReason != "" { + streamRespList = append(streamRespList, streamResp) + + // Handle content block stop + streamRespList = append(streamRespList, &AnthropicStreamResponse{ + Index: &choice.Index, + Type: "content_block_stop", + }) + + // Handle message delta + usage := &AnthropicUsage{ + OutputTokens: bifrostResp.Usage.CompletionTokens, + InputTokens: bifrostResp.Usage.PromptTokens, + } + if bifrostResp.Usage.TokenDetails != nil { + usage.CacheReadInputTokens = bifrostResp.Usage.TokenDetails.CachedTokens + } + streamResp = &AnthropicStreamResponse{ + Type: "message_delta", + Delta: &AnthropicStreamDelta{ + StopReason: choice.FinishReason, + }, + Usage: usage, } } + streamRespList = append(streamRespList, streamResp) } else if choice.BifrostNonStreamResponseChoice != nil { + streamResp := &AnthropicStreamResponse{} + // Handle non-streaming response converted to streaming format streamResp.Type = "message_start" @@ -603,46 +712,23 @@ func DeriveAnthropicStreamFromBifrostResponse(bifrostResp *schemas.BifrostRespon streamMessage.Content = content streamResp.Message = streamMessage - } - } - // Handle usage information - if bifrostResp.Usage != nil { - if streamResp.Type == "" { - streamResp.Type = "message_delta" - } - streamResp.Usage = &AnthropicUsage{ - InputTokens: bifrostResp.Usage.PromptTokens, - OutputTokens: bifrostResp.Usage.CompletionTokens, + streamRespList = append(streamRespList, streamResp) } - } - // Set common fields - if bifrostResp.ID != "" { - streamResp.ID = &bifrostResp.ID - } - if bifrostResp.Model != "" { - streamResp.Model = &bifrostResp.Model } - // Default to empty content_block_delta if no specific type was set - if streamResp.Type == "" { - streamResp.Type = "content_block_delta" - streamResp.Index = bifrost.Ptr(0) - streamResp.Delta = &AnthropicStreamDelta{ - Type: "text_delta", - Text: bifrost.Ptr(""), + result := make([]*AnthropicStreamResponse, 0, len(streamRespList)) + for _, streamResp := range streamRespList { + // Ignore empty stream responses + if streamResp.Type == "" { + continue } - } - // Marshal to JSON and format as SSE - jsonData, err := json.Marshal(streamResp) - if err != nil { - return "" + result = append(result, streamResp) } - // Format as Anthropic SSE - return fmt.Sprintf("event: %s\ndata: %s\n\n", streamResp.Type, jsonData) + return result } // DeriveAnthropicErrorFromBifrostError derives a AnthropicMessageError from a BifrostError diff --git a/transports/bifrost-http/integrations/genai/router.go b/transports/bifrost-http/integrations/genai/router.go index 7739611cae..1971868854 100644 --- a/transports/bifrost-http/integrations/genai/router.go +++ b/transports/bifrost-http/integrations/genai/router.go @@ -38,7 +38,7 @@ func NewGenAIRouter(client *bifrost.Bifrost) *GenAIRouter { return DeriveGeminiErrorFromBifrostError(err) }, StreamConfig: &integrations.StreamConfig{ - ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { + ResponseConverter: func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { return DeriveGeminiStreamFromBifrostResponse(resp), nil }, ErrorConverter: func(err *schemas.BifrostError) interface{} { diff --git a/transports/bifrost-http/integrations/litellm/router.go b/transports/bifrost-http/integrations/litellm/router.go index cede7a42f7..9c1d878733 100644 --- a/transports/bifrost-http/integrations/litellm/router.go +++ b/transports/bifrost-http/integrations/litellm/router.go @@ -168,7 +168,7 @@ func NewLiteLLMRouter(client *bifrost.Bifrost) *LiteLLMRouter { } } - streamResponseConverter := func(resp *schemas.BifrostResponse) (interface{}, error) { + streamResponseConverter := func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { if resp == nil { return nil, errors.New("response is nil") } @@ -183,7 +183,7 @@ func NewLiteLLMRouter(client *bifrost.Bifrost) *LiteLLMRouter { case schemas.OpenAI, schemas.Azure: return openai.DeriveOpenAIStreamFromBifrostResponse(resp), nil case schemas.Anthropic: - return anthropic.DeriveAnthropicStreamFromBifrostResponse(resp), nil + return anthropic.DeriveAnthropicStreamFromBifrostResponse(resp, streamIndex), nil case schemas.Vertex: return genai.DeriveGeminiStreamFromBifrostResponse(resp), nil default: diff --git a/transports/bifrost-http/integrations/openai/router.go b/transports/bifrost-http/integrations/openai/router.go index 226e8abbb9..7bdbda836a 100644 --- a/transports/bifrost-http/integrations/openai/router.go +++ b/transports/bifrost-http/integrations/openai/router.go @@ -45,7 +45,7 @@ func NewOpenAIRouter(client *bifrost.Bifrost) *OpenAIRouter { return DeriveOpenAIErrorFromBifrostError(err) }, StreamConfig: &integrations.StreamConfig{ - ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { + ResponseConverter: func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { return DeriveOpenAIStreamFromBifrostResponse(resp), nil }, ErrorConverter: func(err *schemas.BifrostError) interface{} { @@ -84,7 +84,7 @@ func NewOpenAIRouter(client *bifrost.Bifrost) *OpenAIRouter { return DeriveOpenAIErrorFromBifrostError(err) }, StreamConfig: &integrations.StreamConfig{ - ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { + ResponseConverter: func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { return DeriveOpenAISpeechFromBifrostResponse(resp), nil }, ErrorConverter: func(err *schemas.BifrostError) interface{} { @@ -119,7 +119,7 @@ func NewOpenAIRouter(client *bifrost.Bifrost) *OpenAIRouter { return DeriveOpenAIErrorFromBifrostError(err) }, StreamConfig: &integrations.StreamConfig{ - ResponseConverter: func(resp *schemas.BifrostResponse) (interface{}, error) { + ResponseConverter: func(resp *schemas.BifrostResponse, streamIndex int) (interface{}, error) { return DeriveOpenAITranscriptionFromBifrostResponse(resp), nil }, ErrorConverter: func(err *schemas.BifrostError) interface{} { diff --git a/transports/bifrost-http/integrations/openai/types.go b/transports/bifrost-http/integrations/openai/types.go index b11ae1594f..87f0905af5 100644 --- a/transports/bifrost-http/integrations/openai/types.go +++ b/transports/bifrost-http/integrations/openai/types.go @@ -1,6 +1,9 @@ package openai import ( + "encoding/json" + "fmt" + "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/transports/bifrost-http/integrations" ) @@ -21,6 +24,7 @@ type OpenAIChatRequest struct { Tools *[]schemas.Tool `json:"tools,omitempty"` // Reuse schema type ToolChoice *schemas.ToolChoice `json:"tool_choice,omitempty"` Stream *bool `json:"stream,omitempty"` + StreamOptions *schemas.StreamOptions `json:"stream_options,omitempty"` LogProbs *bool `json:"logprobs,omitempty"` TopLogProbs *int `json:"top_logprobs,omitempty"` ResponseFormat interface{} `json:"response_format,omitempty"` @@ -69,14 +73,15 @@ func (r *OpenAITranscriptionRequest) IsStreamingRequested() bool { // OpenAIChatResponse represents an OpenAI chat completion response type OpenAIChatResponse struct { - ID string `json:"id"` - Object string `json:"object"` - Created int `json:"created"` - Model string `json:"model"` - Choices []schemas.BifrostResponseChoice `json:"choices"` - Usage *schemas.LLMUsage `json:"usage,omitempty"` // Reuse schema type - ServiceTier *string `json:"service_tier,omitempty"` - SystemFingerprint *string `json:"system_fingerprint,omitempty"` + ID string `json:"id"` + Object string `json:"object"` + Created int `json:"created"` + Model string `json:"model"` + Choices []schemas.BifrostResponseChoice `json:"choices"` + Usage *schemas.LLMUsage `json:"usage,omitempty"` // Reuse schema type + ServiceTier *string `json:"service_tier,omitempty"` + SystemFingerprint *string `json:"system_fingerprint,omitempty"` + PromptFilterResults *[]schemas.PromptFilterResult `json:"prompt_filter_results,omitempty"` } // OpenAIChatError represents an OpenAI chat completion error response @@ -92,6 +97,11 @@ type OpenAIChatError struct { } `json:"error"` } +func (e *OpenAIChatError) ToSSE() string { + data, _ := json.Marshal(e) + return fmt.Sprintf("data: %s\n\n", data) +} + // OpenAIChatErrorStruct represents the error structure of an OpenAI chat completion error response type OpenAIChatErrorStruct struct { Type string `json:"type"` // Error type @@ -103,10 +113,11 @@ type OpenAIChatErrorStruct struct { // OpenAIStreamChoice represents a choice in a streaming response chunk type OpenAIStreamChoice struct { - Index int `json:"index"` - Delta *OpenAIStreamDelta `json:"delta,omitempty"` - FinishReason *string `json:"finish_reason,omitempty"` - LogProbs *schemas.LogProbs `json:"logprobs,omitempty"` + Index int `json:"index"` + Delta *OpenAIStreamDelta `json:"delta,omitempty"` + FinishReason *string `json:"finish_reason,omitempty"` + LogProbs *schemas.LogProbs `json:"logprobs,omitempty"` + ContentFilterResults *schemas.ContentFilterResult `json:"content_filter_results,omitempty"` } // OpenAIStreamDelta represents the incremental content in a streaming chunk @@ -127,6 +138,11 @@ type OpenAIStreamResponse struct { Usage *schemas.LLMUsage `json:"usage,omitempty"` } +func (r *OpenAIStreamResponse) ToSSE() string { + data, _ := json.Marshal(r) + return fmt.Sprintf("data: %s\n\n", data) +} + // ConvertToBifrostRequest converts an OpenAI chat request to Bifrost format func (r *OpenAIChatRequest) ConvertToBifrostRequest() *schemas.BifrostRequest { provider, model := integrations.ParseModelString(r.Model, schemas.OpenAI) @@ -265,6 +281,9 @@ func (r *OpenAIChatRequest) convertParameters() *schemas.ModelParameters { if r.Seed != nil { params.ExtraParams["seed"] = *r.Seed } + if r.StreamOptions != nil { + params.StreamOptions = r.StreamOptions + } return params } @@ -310,14 +329,15 @@ func DeriveOpenAIFromBifrostResponse(bifrostResp *schemas.BifrostResponse) *Open } openaiResp := &OpenAIChatResponse{ - ID: bifrostResp.ID, - Object: bifrostResp.Object, - Created: bifrostResp.Created, - Model: bifrostResp.Model, - Choices: bifrostResp.Choices, - Usage: bifrostResp.Usage, - ServiceTier: bifrostResp.ServiceTier, - SystemFingerprint: bifrostResp.SystemFingerprint, + ID: bifrostResp.ID, + Object: bifrostResp.Object, + Created: bifrostResp.Created, + Model: bifrostResp.Model, + Choices: bifrostResp.Choices, + Usage: bifrostResp.Usage, + ServiceTier: bifrostResp.ServiceTier, + SystemFingerprint: bifrostResp.SystemFingerprint, + PromptFilterResults: bifrostResp.PromptFilterResults, } return openaiResp @@ -409,8 +429,9 @@ func DeriveOpenAIStreamFromBifrostResponse(bifrostResp *schemas.BifrostResponse) // Convert choices to streaming format for _, choice := range bifrostResp.Choices { streamChoice := OpenAIStreamChoice{ - Index: choice.Index, - FinishReason: choice.FinishReason, + Index: choice.Index, + FinishReason: choice.FinishReason, + ContentFilterResults: choice.ContentFilterResults, } var delta *OpenAIStreamDelta diff --git a/transports/bifrost-http/integrations/utils.go b/transports/bifrost-http/integrations/utils.go index d81ba8260e..5b7ee1032d 100644 --- a/transports/bifrost-http/integrations/utils.go +++ b/transports/bifrost-http/integrations/utils.go @@ -84,8 +84,8 @@ type RequestConverter func(req interface{}) (*schemas.BifrostRequest, error) type ResponseConverter func(*schemas.BifrostResponse) (interface{}, error) // StreamResponseConverter is a function that converts Bifrost responses to integration-specific streaming format. -// It takes a BifrostResponse and returns the streaming format expected by the specific integration. -type StreamResponseConverter func(*schemas.BifrostResponse) (interface{}, error) +// It takes a BifrostResponse and the index of the stream in the response and returns the streaming format expected by the specific integration. +type StreamResponseConverter func(*schemas.BifrostResponse, int) (interface{}, error) // ErrorConverter is a function that converts BifrostError to integration-specific format. // It takes a BifrostError and returns the format expected by the specific integration. @@ -433,6 +433,9 @@ func (g *GenericRouter) handleStreaming(ctx *fasthttp.RequestCtx, config RouteCo ctx.Response.SetBodyStreamWriter(func(w *bufio.Writer) { defer w.Flush() + // Received BifrostStream Index + streamIndex := -1 + // Process streaming responses for response := range streamChan { if response == nil { @@ -508,12 +511,14 @@ func (g *GenericRouter) handleStreaming(ctx *fasthttp.RequestCtx, config RouteCo // Handle successful responses if response.BifrostResponse != nil { + streamIndex++ + // Convert response to integration-specific streaming format var convertedResponse interface{} var err error if config.StreamConfig.ResponseConverter != nil { - convertedResponse, err = config.StreamConfig.ResponseConverter(response.BifrostResponse) + convertedResponse, err = config.StreamConfig.ResponseConverter(response.BifrostResponse, streamIndex) } else { // Fallback to regular response converter convertedResponse, err = config.ResponseConverter(response.BifrostResponse) diff --git a/transports/go.mod b/transports/go.mod index c458370df9..74633a50b5 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -2,6 +2,8 @@ module github.com/maximhq/bifrost/transports go 1.24.1 +replace github.com/maximhq/bifrost/core => ../core + require ( github.com/fasthttp/router v1.5.4 github.com/fasthttp/websocket v1.5.12