Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- fix: bedrock responses streaming last chunk indicator fixes
- fix: gemini nil content check fixes
- fix: handle responses.incomplete event in openai responses streaming
- enhancements: provider tests enhancements
6 changes: 5 additions & 1 deletion core/providers/bedrock/bedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func (provider *BedrockProvider) ResponsesStream(ctx context.Context, postHookRu
if err == io.EOF {
// End of stream - finalize any open items
finalResponses := FinalizeBedrockStream(streamState, chunkIndex, usage)
for _, finalResponse := range finalResponses {
for i, finalResponse := range finalResponses {
finalResponse.ExtraFields = schemas.BifrostResponseExtraFields{
RequestType: schemas.ResponsesStreamRequest,
Provider: providerName,
Expand All @@ -922,6 +922,10 @@ func (provider *BedrockProvider) ResponsesStream(ctx context.Context, postHookRu
finalResponse.ExtraFields.RawResponse = "{}" // Final event has no payload
}

if i == len(finalResponses)-1 {
finalResponse.ExtraFields.Latency = time.Since(startTime).Milliseconds()
}

providerUtils.ProcessAndSendResponse(ctx, postHookRunner, providerUtils.GetBifrostResponseForStreamResponse(nil, nil, finalResponse, nil, nil), responseChan)
}
break
Expand Down
50 changes: 27 additions & 23 deletions core/providers/gemini/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,20 @@ func convertBifrostMessagesToGemini(messages []schemas.ChatMessage) []Content {
var parts []*Part

// Handle content
if message.Content.ContentStr != nil && *message.Content.ContentStr != "" {
parts = append(parts, &Part{
Text: *message.Content.ContentStr,
})
} else if message.Content.ContentBlocks != nil {
for _, block := range message.Content.ContentBlocks {
if block.Text != nil {
parts = append(parts, &Part{
Text: *block.Text,
})
if message.Content != nil {
if message.Content.ContentStr != nil && *message.Content.ContentStr != "" {
parts = append(parts, &Part{
Text: *message.Content.ContentStr,
})
} else if message.Content.ContentBlocks != nil {
for _, block := range message.Content.ContentBlocks {
if block.Text != nil {
parts = append(parts, &Part{
Text: *block.Text,
})
}
// Handle other content block types as needed
}
// Handle other content block types as needed
}
}

Expand Down Expand Up @@ -472,19 +474,21 @@ func convertBifrostMessagesToGemini(messages []schemas.ChatMessage) []Content {
var responseData map[string]any
var contentStr string

// Extract content string from ContentStr or ContentBlocks
if message.Content.ContentStr != nil && *message.Content.ContentStr != "" {
contentStr = *message.Content.ContentStr
} else if message.Content.ContentBlocks != nil {
// Fallback: try to extract text from content blocks
var textParts []string
for _, block := range message.Content.ContentBlocks {
if block.Text != nil && *block.Text != "" {
textParts = append(textParts, *block.Text)
if message.Content != nil {
// Extract content string from ContentStr or ContentBlocks
if message.Content.ContentStr != nil && *message.Content.ContentStr != "" {
contentStr = *message.Content.ContentStr
} else if message.Content.ContentBlocks != nil {
// Fallback: try to extract text from content blocks
var textParts []string
for _, block := range message.Content.ContentBlocks {
if block.Text != nil && *block.Text != "" {
textParts = append(textParts, *block.Text)
}
}
if len(textParts) > 0 {
contentStr = strings.Join(textParts, "\n")
}
}
if len(textParts) > 0 {
contentStr = strings.Join(textParts, "\n")
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/providers/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ func HandleOpenAIResponsesStreaming(
response.ExtraFields.ModelRequested = request.Model
response.ExtraFields.ChunkIndex = response.SequenceNumber

if response.Type == schemas.ResponsesStreamResponseTypeCompleted {
if response.Type == schemas.ResponsesStreamResponseTypeCompleted || response.Type == schemas.ResponsesStreamResponseTypeIncomplete {
response.ExtraFields.Latency = time.Since(startTime).Milliseconds()
ctx = context.WithValue(ctx, schemas.BifrostContextKeyStreamEndIndicator, true)
providerUtils.ProcessAndSendResponse(ctx, postHookRunner, providerUtils.GetBifrostResponseForStreamResponse(nil, nil, &response, nil, nil), responseChan)
Expand Down
2 changes: 1 addition & 1 deletion core/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.29
1.2.30
3 changes: 2 additions & 1 deletion framework/changelog.md
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
feature: Qdrant Vector Search Support (#893)
- feat: Qdrant Vector Search Support (#893)
- fix: stream accumulator nil content check fixes
9 changes: 6 additions & 3 deletions framework/streaming/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (a *Accumulator) accumulateToolCallsInMessage(message *schemas.ChatMessage,
}
toolCallToModify = &schemas.ChatAssistantMessageToolCall{
Index: uint16(len(existingToolCalls)),
ID: deltaToolCall.ID,
ID: deltaToolCall.ID,
Function: schemas.ChatAssistantMessageToolCallFunction{
Name: deltaToolCall.Function.Name,
Arguments: args,
Expand All @@ -269,10 +269,10 @@ func (a *Accumulator) appendContentToMessage(message *schemas.ChatMessage, newCo
if message == nil {
return
}
if message.Content.ContentStr != nil {
if message.Content != nil && message.Content.ContentStr != nil {
// Append to existing string content
*message.Content.ContentStr += newContent
} else if message.Content.ContentBlocks != nil {
} else if message.Content != nil && message.Content.ContentBlocks != nil {
// Find the last text block and append, or create new one
blocks := message.Content.ContentBlocks
if len(blocks) > 0 && blocks[len(blocks)-1].Type == schemas.ChatContentBlockTypeText && blocks[len(blocks)-1].Text != nil {
Expand All @@ -287,6 +287,9 @@ func (a *Accumulator) appendContentToMessage(message *schemas.ChatMessage, newCo
message.Content.ContentBlocks = blocks
}
} else {
if message.Content == nil {
message.Content = &schemas.ChatMessageContent{}
}
// Initialize with string content
message.Content.ContentStr = &newContent
}
Expand Down
8 changes: 0 additions & 8 deletions framework/streaming/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,6 @@ func (a *Accumulator) processAccumulatedResponsesStreamingChunks(requestID strin
data.EndTimestamp = accumulator.FinalTimestamp
data.OutputMessages = completeMessages

// Extract tool calls from messages
for _, msg := range completeMessages {
if msg.ResponsesToolMessage != nil {
// Add tool call info to accumulated data
// This is simplified - you might want to extract specific tool call info
}
}

data.ErrorDetails = respErr

// Update token usage from final chunk if available
Expand Down
2 changes: 1 addition & 1 deletion framework/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.37
1.1.38
1 change: 1 addition & 0 deletions plugins/governance/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/governance/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.38
1.3.39
1 change: 1 addition & 0 deletions plugins/jsonparser/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/jsonparser/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.38
1.3.39
1 change: 1 addition & 0 deletions plugins/logging/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/logging/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.38
1.3.39
1 change: 1 addition & 0 deletions plugins/maxim/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/maxim/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.37
1.4.38
1 change: 1 addition & 0 deletions plugins/mocker/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
7 changes: 3 additions & 4 deletions plugins/mocker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,11 @@ func validateErrorResponse(errorContent ErrorResponse) error {
return nil
}



// GetName returns the plugin name
func (p *MockerPlugin) GetName() string {
return PluginName
}

// TransportInterceptor is not used for this plugin
func (p *MockerPlugin) TransportInterceptor(ctx *context.Context, url string, headers map[string]string, body map[string]any) (map[string]string, map[string]any, error) {
return headers, body, nil
Expand Down Expand Up @@ -669,7 +668,7 @@ func (p *MockerPlugin) extractMessageContentFast(req *schemas.BifrostRequest) st

// Fast path for single message
if len(messages) == 1 {
if messages[0].Content.ContentStr != nil {
if messages[0].Content != nil && messages[0].Content.ContentStr != nil {
return *messages[0].Content.ContentStr
}
return ""
Expand All @@ -678,7 +677,7 @@ func (p *MockerPlugin) extractMessageContentFast(req *schemas.BifrostRequest) st
// Multiple messages - use string builder for efficiency
var builder strings.Builder
for i, message := range messages {
if message.Content.ContentStr != nil {
if message.Content != nil && message.Content.ContentStr != nil {
if i > 0 {
builder.WriteByte(' ')
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/mocker/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.37
1.3.38
1 change: 1 addition & 0 deletions plugins/otel/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/otel/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.37
1.0.38
1 change: 1 addition & 0 deletions plugins/semanticcache/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/semanticcache/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.37
1.3.38
1 change: 1 addition & 0 deletions plugins/telemetry/changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- chore: upgrades core to 1.2.30 and framework to 1.1.38
2 changes: 1 addition & 1 deletion plugins/telemetry/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.37
1.3.38
46 changes: 36 additions & 10 deletions tests/core-providers/scenarios/chat_completion_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scenarios

import (
"context"
"fmt"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -186,9 +187,7 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
// Enhanced validation expectations for streaming
expectations := GetExpectationsForScenario("ChatCompletionStream", testConfig, map[string]interface{}{})
expectations = ModifyExpectationsForProvider(expectations, testConfig.Provider)
expectations.ShouldContainAnyOf = append(expectations.ShouldContainAnyOf, []string{"paris"}...) // Should include story elements
expectations.MinContentLength = 50 // Should be substantial story
expectations.MaxContentLength = 2000 // Reasonable upper bound
expectations.ShouldContainAnyOf = append(expectations.ShouldContainAnyOf, []string{"paris"}...) // Should include story elements // Reasonable upper bound

// Validate the consolidated streaming response
validationResult := ValidateChatResponse(t, consolidatedResponse, nil, expectations, "ChatCompletionStream")
Expand Down Expand Up @@ -240,10 +239,36 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
Fallbacks: testConfig.Fallbacks,
}

responseChannel, err := client.ChatCompletionStreamRequest(ctx, request)
RequireNoError(t, err, "Chat completion stream with tools failed")
// Use retry framework for stream requests with tools
retryConfig := StreamingRetryConfig()
retryContext := TestRetryContext{
ScenarioName: "ChatCompletionStreamWithTools",
ExpectedBehavior: map[string]interface{}{
"should_stream_content": true,
"should_have_tool_calls": true,
"tool_name": "get_weather",
},
TestMetadata: map[string]interface{}{
"provider": testConfig.Provider,
"model": testConfig.ChatModel,
"tools": true,
},
}

responseChannel, err := WithStreamRetry(t, retryConfig, retryContext, func() (chan *schemas.BifrostStream, *schemas.BifrostError) {
return client.ChatCompletionStreamRequest(ctx, request)
})

// Enhanced error handling with explicit logging
if err != nil {
errorMsg := GetErrorMessage(err)
if !strings.Contains(errorMsg, "❌") {
errorMsg = fmt.Sprintf("❌ %s", errorMsg)
}
t.Fatalf("❌ Chat completion stream with tools failed after retries: %s", errorMsg)
}
if responseChannel == nil {
t.Fatal("Response channel should not be nil")
t.Fatalf("❌ Response channel should not be nil")
}

var toolCallDetected bool
Expand All @@ -262,7 +287,7 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
}

if response == nil || response.BifrostChatResponse == nil {
t.Fatal("Streaming response should not be nil")
t.Fatalf("❌ Streaming response should not be nil")
}
responseCount++

Expand Down Expand Up @@ -294,16 +319,17 @@ func RunChatCompletionStreamTest(t *testing.T, client *bifrost.Bifrost, ctx cont
}

case <-streamCtx.Done():
t.Fatal("Timeout waiting for streaming response with tools")
t.Fatalf("❌ Timeout waiting for streaming response with tools")
}
}

toolStreamComplete:
if responseCount == 0 {
t.Fatal("Should receive at least one streaming response")
t.Fatalf("❌ Should receive at least one streaming response")
}
if !toolCallDetected {
t.Fatal("Should detect tool calls in streaming response")
// Log error before failing - this is a validation failure
t.Fatalf("❌ Should detect tool calls in streaming response (received %d chunks but no tool calls)", responseCount)
}
t.Logf("✅ Streaming with tools test completed successfully")
})
Expand Down
4 changes: 0 additions & 4 deletions tests/core-providers/scenarios/complete_end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ func RunCompleteEnd2EndTest(t *testing.T, client *bifrost.Bifrost, ctx context.C
// Enhanced validation for step 2 - should acknowledge tool results
expectations2 := ConversationExpectations([]string{"weather", "temperature"})
expectations2 = ModifyExpectationsForProvider(expectations2, testConfig.Provider)
expectations2.MinContentLength = 15 // Should provide meaningful response to tool result
expectations2.MaxContentLength = 500 // Reasonable upper bound for tool result processing
expectations2.ShouldNotContainWords = []string{
"cannot help", "don't understand", "no information",
"unable to process", "invalid tool result",
Expand Down Expand Up @@ -330,8 +328,6 @@ func RunCompleteEnd2EndTest(t *testing.T, client *bifrost.Bifrost, ctx context.C

// Enhanced validation for final response
expectations3 = ModifyExpectationsForProvider(expectations3, testConfig.Provider)
expectations3.MinContentLength = 20 // Should provide some meaningful response
expectations3.MaxContentLength = 800 // End-to-end can be verbose
expectations3.ShouldNotContainWords = []string{
"cannot help", "don't understand", "confused",
"start over", "reset conversation",
Expand Down
Loading