Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions core/providers/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,9 +1808,17 @@ func HandleOpenAIResponsesStreaming(
if response.Code != nil {
bifrostErr.Error.Code = response.Code
}
if response.Response != nil && response.Response.Error != nil {
if response.Response.Error.Message != "" && bifrostErr.Error.Message == "" {
bifrostErr.Error.Message = response.Response.Error.Message
}
if response.Response.Error.Code != "" && (bifrostErr.Error.Code == nil || *bifrostErr.Error.Code == "") {
bifrostErr.Error.Code = schemas.Ptr(response.Response.Error.Code)
}
}

ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
providerUtils.ProcessAndSendBifrostError(ctx, postHookRunner, providerUtils.EnrichError(ctx, bifrostErr, jsonBody, nil, sendBackRawRequest, sendBackRawResponse), responseChan, logger)
providerUtils.ProcessAndSendBifrostError(ctx, postHookRunner, providerUtils.EnrichError(ctx, bifrostErr, jsonBody, []byte(jsonData), sendBackRawRequest, sendBackRawResponse), responseChan, logger)
return
}

Expand All @@ -1832,7 +1840,7 @@ func HandleOpenAIResponsesStreaming(
bifrostErr.Error.Code = &response.Response.Error.Code
}
ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
providerUtils.ProcessAndSendBifrostError(ctx, postHookRunner, providerUtils.EnrichError(ctx, bifrostErr, jsonBody, nil, sendBackRawRequest, sendBackRawResponse), responseChan, logger)
providerUtils.ProcessAndSendBifrostError(ctx, postHookRunner, providerUtils.EnrichError(ctx, bifrostErr, jsonBody, []byte(jsonData), sendBackRawRequest, sendBackRawResponse), responseChan, logger)
return
}

Expand Down
19 changes: 13 additions & 6 deletions framework/streaming/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (a *Accumulator) createStreamAccumulator(requestID string) *StreamAccumulat
MaxResponsesChunkIndex: -1,
MaxTranscriptionChunkIndex: -1,
MaxAudioChunkIndex: -1,
TerminalErrorChunkIndex: -1,
IsComplete: false,
mu: sync.Mutex{},
Timestamp: now,
Expand Down Expand Up @@ -186,6 +187,7 @@ func (a *Accumulator) getOrCreateStreamAccumulator(requestID string) *StreamAccu
MaxResponsesChunkIndex: -1,
MaxTranscriptionChunkIndex: -1,
MaxAudioChunkIndex: -1,
TerminalErrorChunkIndex: -1,
IsComplete: false,
mu: sync.Mutex{},
Timestamp: now,
Expand Down Expand Up @@ -378,16 +380,21 @@ func (a *Accumulator) cleanupStreamAccumulator(requestID string) {
}
}


// ProcessStreamingResponse processes a streaming response
// It handles chat, audio, and responses streaming responses
func (a *Accumulator) ProcessStreamingResponse(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*ProcessedStreamResponse, error) {
// Check if this is a streaming response
if result == nil {
return nil, fmt.Errorf("result is nil")
// Check if at least one of result or error is provided
if result == nil && bifrostErr == nil {
return nil, fmt.Errorf("result and error are nil")
}

var requestType schemas.RequestType
if result != nil {
requestType = result.GetExtraFields().RequestType
} else if bifrostErr != nil {
requestType = bifrostErr.ExtraFields.RequestType
}
extraFields := result.GetExtraFields()
requestType := extraFields.RequestType

isAudioStreaming := requestType == schemas.SpeechStreamRequest || requestType == schemas.TranscriptionStreamRequest
isChatStreaming := requestType == schemas.ChatCompletionStreamRequest || requestType == schemas.TextCompletionStreamRequest
isResponsesStreaming := requestType == schemas.ResponsesStreamRequest
Expand Down
17 changes: 17 additions & 0 deletions framework/streaming/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"time"

"github.com/bytedance/sonic"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/schemas"
)
Expand Down Expand Up @@ -898,6 +899,22 @@ func (a *Accumulator) processResponsesStreamingResponse(ctx *schemas.BifrostCont

if bifrostErr != nil {
chunk.FinishReason = bifrost.Ptr("error")
if bifrostErr.ExtraFields.RawResponse != nil {
if rawBytes, marshalErr := sonic.Marshal(bifrostErr.ExtraFields.RawResponse); marshalErr == nil {
chunk.RawResponse = bifrost.Ptr(string(rawBytes))
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
// Assign a stable trailing index; reuse on duplicate plugin calls so dedup fires correctly.
accumulator := a.getOrCreateStreamAccumulator(requestID)
accumulator.mu.Lock()
if accumulator.TerminalErrorChunkIndex >= 0 {
chunk.ChunkIndex = accumulator.TerminalErrorChunkIndex
} else {
accumulator.MaxResponsesChunkIndex++
chunk.ChunkIndex = accumulator.MaxResponsesChunkIndex
accumulator.TerminalErrorChunkIndex = chunk.ChunkIndex
}
accumulator.mu.Unlock()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} else if result != nil && result.ResponsesStreamResponse != nil {
if result.ResponsesStreamResponse.ExtraFields.RawResponse != nil {
chunk.RawResponse = bifrost.Ptr(fmt.Sprintf("%v", result.ResponsesStreamResponse.ExtraFields.RawResponse))
Expand Down
3 changes: 3 additions & 0 deletions framework/streaming/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type StreamAccumulator struct {
MaxTranscriptionChunkIndex int
MaxAudioChunkIndex int

// TerminalErrorChunkIndex holds the reserved chunk index for the terminal error (-1 = unset); reused across plugin calls for correct dedup.
TerminalErrorChunkIndex int

IsComplete bool
FinalTimestamp time.Time
mu sync.Mutex
Expand Down
19 changes: 17 additions & 2 deletions plugins/logging/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,21 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas.
if bifrost.IsStreamRequestType(requestType) {
entry.Stream = true
}

// For streaming errors, finalize and read accumulated chunks so logs retain pre-error stream metadata
if bifrost.IsStreamRequestType(requestType) &&
requestType != schemas.PassthroughStreamRequest &&
requestType != schemas.RealtimeRequest &&
tracer != nil &&
traceID != "" {
if accResult := tracer.ProcessStreamingChunk(traceID, true, result, bifrostErr); accResult != nil {
if streamResponse := convertToProcessedStreamResponse(accResult, requestType); streamResponse != nil {
p.applyStreamingOutputToEntry(entry, streamResponse)
}
}
tracer.CleanupStreamAccumulator(traceID)
}

// Serialize error details immediately since bifrostErr may be released
// back to the pool before the async batch writer processes this entry.
// Also set ErrorDetailsParsed for UI callback (JSON serialization uses this field).
Expand All @@ -688,14 +703,14 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas.
}
entry.ErrorDetailsParsed = bifrostErr
if p.disableContentLogging == nil || !*p.disableContentLogging {
if bifrostErr.ExtraFields.RawRequest != nil {
if entry.RawRequest == "" && bifrostErr.ExtraFields.RawRequest != nil {
rawReqBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawRequest)
if err == nil {
entry.RawRequest = string(rawReqBytes)
}
}

if bifrostErr.ExtraFields.RawResponse != nil {
if entry.RawResponse == "" && bifrostErr.ExtraFields.RawResponse != nil {
rawRespBytes, err := sonic.Marshal(bifrostErr.ExtraFields.RawResponse)
if err == nil {
entry.RawResponse = string(rawRespBytes)
Expand Down
31 changes: 15 additions & 16 deletions plugins/logging/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,12 @@ func (p *LoggerPlugin) applyStreamingOutputToEntry(entry *logstore.Log, streamRe
}
latF := float64(streamResponse.Data.Latency)
entry.Latency = &latF
return
} else {
entry.Status = "success"
latF := float64(streamResponse.Data.Latency)
entry.Latency = &latF
}

entry.Status = "success"
latF := float64(streamResponse.Data.Latency)
entry.Latency = &latF

// Update model if provided
if streamResponse.Data.Model != "" {
entry.Model = streamResponse.Data.Model
Expand Down Expand Up @@ -1093,19 +1092,19 @@ func buildResponseForRequestType(requestType schemas.RequestType, usage *schemas
CachedWriteTokens: usage.PromptTokensDetails.CachedWriteTokens,
}
}
if usage.CompletionTokensDetails != nil {
respUsage.OutputTokensDetails = &schemas.ResponsesResponseOutputTokens{
TextTokens: usage.CompletionTokensDetails.TextTokens,
AcceptedPredictionTokens: usage.CompletionTokensDetails.AcceptedPredictionTokens,
AudioTokens: usage.CompletionTokensDetails.AudioTokens,
ImageTokens: usage.CompletionTokensDetails.ImageTokens,
ReasoningTokens: usage.CompletionTokensDetails.ReasoningTokens,
RejectedPredictionTokens: usage.CompletionTokensDetails.RejectedPredictionTokens,
CitationTokens: usage.CompletionTokensDetails.CitationTokens,
NumSearchQueries: usage.CompletionTokensDetails.NumSearchQueries,
if usage.CompletionTokensDetails != nil {
respUsage.OutputTokensDetails = &schemas.ResponsesResponseOutputTokens{
TextTokens: usage.CompletionTokensDetails.TextTokens,
AcceptedPredictionTokens: usage.CompletionTokensDetails.AcceptedPredictionTokens,
AudioTokens: usage.CompletionTokensDetails.AudioTokens,
ImageTokens: usage.CompletionTokensDetails.ImageTokens,
ReasoningTokens: usage.CompletionTokensDetails.ReasoningTokens,
RejectedPredictionTokens: usage.CompletionTokensDetails.RejectedPredictionTokens,
CitationTokens: usage.CompletionTokensDetails.CitationTokens,
NumSearchQueries: usage.CompletionTokensDetails.NumSearchQueries,
}
}
}
}
return &schemas.BifrostResponse{
ResponsesResponse: &schemas.BifrostResponsesResponse{
Usage: respUsage,
Expand Down
Loading