Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
**/venv/
**/__pycache__/**
private.*
.venv
Comment thread
Pratham-Mishra04 marked this conversation as resolved.
76 changes: 76 additions & 0 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,82 @@ func (bifrost *Bifrost) prepareProvider(providerKey schemas.ModelProvider, confi
return nil
}

// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider.
// This method gracefully stops existing workers, creates a new queue with updated settings,
// and starts new workers with the updated concurrency configuration.
//
// Parameters:
// - providerKey: The provider to update
//
// Returns:
// - error: Any error that occurred during the update process
//
// Note: This operation will temporarily pause request processing for the specified provider
// while the transition occurs. In-flight requests will complete before workers are stopped.
func (bifrost *Bifrost) UpdateProviderConcurrency(providerKey schemas.ModelProvider) error {
bifrost.logger.Info(fmt.Sprintf("Updating concurrency configuration for provider %s", providerKey))

// Get the updated configuration from the account
providerConfig, err := bifrost.account.GetConfigForProvider(providerKey)
if err != nil {
return fmt.Errorf("failed to get updated config for provider %s: %v", providerKey, err)
}

// Check if provider currently exists
oldQueue, exists := bifrost.requestQueues[providerKey]
if !exists {
bifrost.logger.Debug(fmt.Sprintf("Provider %s not currently active, initializing with new configuration", providerKey))
// If provider doesn't exist, just prepare it with new configuration
return bifrost.prepareProvider(providerKey, providerConfig)
}

// Check if the provider has any keys (skip keyless providers)
if providerRequiresKey(providerKey) {
keys, err := bifrost.account.GetKeysForProvider(providerKey)
if err != nil || len(keys) == 0 {
return fmt.Errorf("failed to get keys for provider %s: %v", providerKey, err)
}
}

bifrost.logger.Debug(fmt.Sprintf("Gracefully stopping existing workers for provider %s", providerKey))

// Step 1: Close the existing queue to signal workers to stop processing new requests
close(oldQueue)

// Step 2: Wait for all existing workers to finish processing in-flight requests
if waitGroup, exists := bifrost.waitGroups[providerKey]; exists {
waitGroup.Wait()
bifrost.logger.Debug(fmt.Sprintf("All workers for provider %s have stopped", providerKey))
}

// Step 3: Create new queue with updated buffer size
newQueue := make(chan ChannelMessage, providerConfig.ConcurrencyAndBufferSize.BufferSize)
bifrost.requestQueues[providerKey] = newQueue

// Step 4: Create new wait group for the updated workers
bifrost.waitGroups[providerKey] = &sync.WaitGroup{}

// Step 5: Create provider instance
provider, err := bifrost.createProviderFromProviderKey(providerKey, providerConfig)
if err != nil {
return fmt.Errorf("failed to create provider instance for %s: %v", providerKey, err)
}

// Step 6: Start new workers with updated concurrency
bifrost.logger.Debug(fmt.Sprintf("Starting %d new workers for provider %s with buffer size %d",
providerConfig.ConcurrencyAndBufferSize.Concurrency,
providerKey,
providerConfig.ConcurrencyAndBufferSize.BufferSize))

for range providerConfig.ConcurrencyAndBufferSize.Concurrency {
bifrost.waitGroups[providerKey].Add(1)
go bifrost.requestWorker(provider, newQueue)
}

bifrost.logger.Info(fmt.Sprintf("Successfully updated concurrency configuration for provider %s", providerKey))
return nil
}
Comment thread
Pratham-Mishra04 marked this conversation as resolved.

// Init initializes a new Bifrost instance with the given configuration.
// It sets up the account, plugins, object pools, and initializes providers.
// Returns an error if initialization fails.
Expand Down
138 changes: 138 additions & 0 deletions transports/bifrost-http/handlers/completions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Package handlers provides HTTP request handlers for the Bifrost HTTP transport.
// This file contains completion request handlers for text and chat completions.
package handlers

import (
"encoding/json"
"fmt"

"github.com/fasthttp/router"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/transports/bifrost-http/lib"
"github.com/valyala/fasthttp"
)

// CompletionHandler manages HTTP requests for completion operations
type CompletionHandler struct {
client *bifrost.Bifrost
logger schemas.Logger
}

// NewCompletionHandler creates a new completion handler instance
func NewCompletionHandler(client *bifrost.Bifrost, logger schemas.Logger) *CompletionHandler {
return &CompletionHandler{
client: client,
logger: logger,
}
}

// CompletionRequest represents a request for either text or chat completion
type CompletionRequest struct {
Provider schemas.ModelProvider `json:"provider"` // The AI model provider to use
Messages []schemas.BifrostMessage `json:"messages"` // Chat messages (for chat completion)
Text string `json:"text"` // Text input (for text completion)
Model string `json:"model"` // Model to use
Params *schemas.ModelParameters `json:"params"` // Additional model parameters
Fallbacks []schemas.Fallback `json:"fallbacks"` // Fallback providers and models
}

type CompletionType string

const (
CompletionTypeText CompletionType = "text"
CompletionTypeChat CompletionType = "chat"
)

// RegisterRoutes registers all completion-related routes
func (h *CompletionHandler) RegisterRoutes(r *router.Router) {
// Completion endpoints
r.POST("/v1/text/completions", h.TextCompletion)
r.POST("/v1/chat/completions", h.ChatCompletion)
}

// TextCompletion handles POST /v1/text/completions - Process text completion requests
func (h *CompletionHandler) TextCompletion(ctx *fasthttp.RequestCtx) {
h.handleCompletion(ctx, CompletionTypeText)
}

// ChatCompletion handles POST /v1/chat/completions - Process chat completion requests
func (h *CompletionHandler) ChatCompletion(ctx *fasthttp.RequestCtx) {
h.handleCompletion(ctx, CompletionTypeChat)
}

// handleCompletion processes both text and chat completion requests
// It handles request parsing, validation, and response formatting
func (h *CompletionHandler) handleCompletion(ctx *fasthttp.RequestCtx, completionType CompletionType) {
var req CompletionRequest
if err := json.Unmarshal(ctx.PostBody(), &req); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid request format: %v", err), h.logger)
return
}

// Validate required fields
if req.Provider == "" {
SendError(ctx, fasthttp.StatusBadRequest, "Provider is required", h.logger)
return
}

if req.Model == "" {
SendError(ctx, fasthttp.StatusBadRequest, "Model is required", h.logger)
return
}

// Create BifrostRequest
bifrostReq := &schemas.BifrostRequest{
Provider: req.Provider,
Model: req.Model,
Params: req.Params,
Fallbacks: req.Fallbacks,
}

// Validate and set input based on completion type
switch completionType {
case CompletionTypeText:
if req.Text == "" {
SendError(ctx, fasthttp.StatusBadRequest, "Text is required for text completion", h.logger)
return
}
bifrostReq.Input = schemas.RequestInput{
TextCompletionInput: &req.Text,
}
case CompletionTypeChat:
if len(req.Messages) == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "Messages array is required for chat completion", h.logger)
return
}
bifrostReq.Input = schemas.RequestInput{
ChatCompletionInput: &req.Messages,
}
}

// Convert context
bifrostCtx := lib.ConvertToBifrostContext(ctx)
if bifrostCtx == nil {
SendError(ctx, fasthttp.StatusInternalServerError, "Failed to convert context", h.logger)
return
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Execute request
var resp *schemas.BifrostResponse
var bifrostErr *schemas.BifrostError

switch completionType {
case CompletionTypeText:
resp, bifrostErr = h.client.TextCompletionRequest(*bifrostCtx, bifrostReq)
case CompletionTypeChat:
resp, bifrostErr = h.client.ChatCompletionRequest(*bifrostCtx, bifrostReq)
}

// Handle response
if bifrostErr != nil {
SendBifrostError(ctx, bifrostErr, h.logger)
return
}

// Send successful response
SendJSON(ctx, resp, h.logger)
}
41 changes: 41 additions & 0 deletions transports/bifrost-http/handlers/integrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Package handlers provides HTTP request handlers for the Bifrost HTTP transport.
// This file contains integration management handlers for AI provider integrations.
package handlers

import (
"github.com/fasthttp/router"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/transports/bifrost-http/integrations"
"github.com/maximhq/bifrost/transports/bifrost-http/integrations/anthropic"
"github.com/maximhq/bifrost/transports/bifrost-http/integrations/genai"
"github.com/maximhq/bifrost/transports/bifrost-http/integrations/litellm"
"github.com/maximhq/bifrost/transports/bifrost-http/integrations/openai"
)

// IntegrationHandler manages HTTP requests for AI provider integrations
type IntegrationHandler struct {
extensions []integrations.ExtensionRouter
}

// NewIntegrationHandler creates a new integration handler instance
func NewIntegrationHandler(client *bifrost.Bifrost) *IntegrationHandler {
// Initialize all available integration routers
extensions := []integrations.ExtensionRouter{
genai.NewGenAIRouter(client),
openai.NewOpenAIRouter(client),
anthropic.NewAnthropicRouter(client),
litellm.NewLiteLLMRouter(client),
}

return &IntegrationHandler{
extensions: extensions,
}
}

// RegisterRoutes registers all integration routes for AI provider compatibility endpoints
func (h *IntegrationHandler) RegisterRoutes(r *router.Router) {
// Register routes for each integration extension
for _, extension := range h.extensions {
extension.RegisterRoutes(r)
}
}
71 changes: 71 additions & 0 deletions transports/bifrost-http/handlers/mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Package handlers provides HTTP request handlers for the Bifrost HTTP transport.
// This file contains MCP (Model Context Protocol) tool execution handlers.
package handlers

import (
"encoding/json"
"fmt"

"github.com/fasthttp/router"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/transports/bifrost-http/lib"
"github.com/valyala/fasthttp"
)

// MCPHandler manages HTTP requests for MCP tool operations
type MCPHandler struct {
client *bifrost.Bifrost
logger schemas.Logger
}

// NewMCPHandler creates a new MCP handler instance
func NewMCPHandler(client *bifrost.Bifrost, logger schemas.Logger) *MCPHandler {
return &MCPHandler{
client: client,
logger: logger,
}
}

// RegisterRoutes registers all MCP-related routes
func (h *MCPHandler) RegisterRoutes(r *router.Router) {
// MCP tool execution endpoint
r.POST("/v1/mcp/tool/execute", h.ExecuteTool)
}

// ExecuteTool handles POST /v1/mcp/tool/execute - Execute MCP tool
func (h *MCPHandler) ExecuteTool(ctx *fasthttp.RequestCtx) {
var req schemas.ToolCall
if err := json.Unmarshal(ctx.PostBody(), &req); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid request format: %v", err), h.logger)
return
}

// Validate required fields
if req.Function.Name == nil || *req.Function.Name == "" {
SendError(ctx, fasthttp.StatusBadRequest, "Tool function name is required", h.logger)
return
}

// Convert context
bifrostCtx := lib.ConvertToBifrostContext(ctx)
if bifrostCtx == nil {
SendError(ctx, fasthttp.StatusInternalServerError, "Failed to convert context", h.logger)
return
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Execute MCP tool
resp, bifrostErr := h.client.ExecuteMCPTool(*bifrostCtx, req)
if bifrostErr != nil {
SendBifrostError(ctx, bifrostErr, h.logger)
return
}

// Send successful response
ctx.SetStatusCode(fasthttp.StatusOK)
ctx.SetContentType("application/json")
if encodeErr := json.NewEncoder(ctx).Encode(resp); encodeErr != nil {
h.logger.Warn(fmt.Sprintf("Failed to encode response: %v", encodeErr))
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to encode response: %v", encodeErr), h.logger)
}
}
Loading