Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/maximhq/bifrost/core/providers"
Expand Down Expand Up @@ -57,9 +58,9 @@ type Bifrost struct {
responseChannelPool sync.Pool // Pool for response channels, initial pool size is set in Init
errorChannelPool sync.Pool // Pool for error channels, initial pool size is set in Init
logger schemas.Logger // logger instance, default logger is used if not provided
dropExcessRequests bool // If true, in cases where the queue is full, requests will not wait for the queue to be empty and will be dropped instead.
backgroundCtx context.Context // Shared background context for nil context handling
mcpManager *MCPManager // MCP integration manager (nil if MCP not configured)
dropExcessRequests atomic.Bool // If true, in cases where the queue is full, requests will not wait for the queue to be empty and will be dropped instead.
}

// PluginPipeline encapsulates the execution of plugin PreHooks and PostHooks, tracks how many plugins ran, and manages short-circuiting and error aggregation.
Expand Down Expand Up @@ -281,13 +282,13 @@ func Init(config schemas.BifrostConfig) (*Bifrost, error) {
}

bifrost := &Bifrost{
account: config.Account,
plugins: config.Plugins,
waitGroups: make(map[schemas.ModelProvider]*sync.WaitGroup),
requestQueues: make(map[schemas.ModelProvider]chan ChannelMessage),
dropExcessRequests: config.DropExcessRequests,
backgroundCtx: context.Background(),
account: config.Account,
plugins: config.Plugins,
waitGroups: make(map[schemas.ModelProvider]*sync.WaitGroup),
requestQueues: make(map[schemas.ModelProvider]chan ChannelMessage),
backgroundCtx: context.Background(),
}
bifrost.dropExcessRequests.Store(config.DropExcessRequests)

// Initialize object pools
bifrost.channelMessagePool = sync.Pool{
Expand Down Expand Up @@ -576,7 +577,7 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan
}
break
}

result, bifrostError = executor(provider, &req, key)
if bifrostError != nil && !bifrostError.IsBifrostError {
break // Don't retry client errors
Expand Down Expand Up @@ -919,7 +920,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
bifrost.releaseChannelMessage(msg)
return nil, newBifrostErrorFromMsg("request cancelled while waiting for queue space")
default:
if bifrost.dropExcessRequests {
if bifrost.dropExcessRequests.Load() {
bifrost.releaseChannelMessage(msg)
bifrost.logger.Warn("Request dropped: queue is full, please increase the queue size or set dropExcessRequests to false")
return nil, newBifrostErrorFromMsg("request dropped: queue is full")
Expand Down Expand Up @@ -958,6 +959,13 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont
}
}

// UpdateDropExcessRequests updates the DropExcessRequests setting at runtime.
// This allows for hot-reloading of this configuration value.
func (bifrost *Bifrost) UpdateDropExcessRequests(value bool) {
bifrost.dropExcessRequests.Store(value)
bifrost.logger.Info(fmt.Sprintf("DropExcessRequests updated to: %v", value))
}

// ExecuteMCPTool executes an MCP tool call and returns the result as a tool message.
// This is the main public API for manual MCP tool execution.
//
Expand Down
6 changes: 2 additions & 4 deletions transports/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ USER appuser
# Environment variables with defaults
ENV APP_PORT=8080 \
APP_POOL_SIZE=300 \
APP_DROP_EXCESS_REQUESTS=false \
APP_PLUGINS="" \
APP_PROMETHEUS_LABELS=""
APP_PLUGINS=""

EXPOSE 8080

# Direct entrypoint with environment variable expansion
ENTRYPOINT ["/bin/sh", "-c", "exec /app/main -config /app/config/config.json -port \"${APP_PORT}\" -pool-size \"${APP_POOL_SIZE}\" -drop-excess-requests \"${APP_DROP_EXCESS_REQUESTS}\" -plugins \"${APP_PLUGINS}\" -prometheus-labels \"${APP_PROMETHEUS_LABELS}\""]
ENTRYPOINT ["/bin/sh", "-c", "exec /app/main -config /app/config/config.json -port \"${APP_PORT}\" -pool-size \"${APP_POOL_SIZE}\" -plugins \"${APP_PLUGINS}\""]
65 changes: 65 additions & 0 deletions transports/bifrost-http/handlers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package handlers

import (
"encoding/json"
"fmt"
"os"

"github.com/fasthttp/router"
bifrost "github.com/maximhq/bifrost/core"
"github.com/maximhq/bifrost/core/schemas"
"github.com/valyala/fasthttp"
)

// ConfigHandler manages runtime configuration updates for Bifrost.
// It provides an endpoint to hot-reload settings from the configuration file.
type ConfigHandler struct {
client *bifrost.Bifrost
logger schemas.Logger
configPath string
}

// NewConfigHandler creates a new handler for configuration management.
// It requires the Bifrost client, a logger, and the path to the config file to be reloaded.
func NewConfigHandler(client *bifrost.Bifrost, logger schemas.Logger, configPath string) *ConfigHandler {
return &ConfigHandler{
client: client,
logger: logger,
configPath: configPath,
}
}

// RegisterRoutes registers the configuration-related routes.
// It adds the `PUT /config` endpoint.
func (h *ConfigHandler) RegisterRoutes(r *router.Router) {
r.PUT("/config", h.handleReloadConfig)
}

// handleReloadConfig re-reads the configuration file and applies updatable settings.
// Currently, it supports hot-reloading of the `drop_excess_requests` setting.
// Note that settings like `prometheus_labels` cannot be changed at runtime.
func (h *ConfigHandler) handleReloadConfig(ctx *fasthttp.RequestCtx) {
var config struct {
BifrostSettings struct {
DropExcessRequests *bool `json:"drop_excess_requests,omitempty"`
} `json:"bifrost_settings"`
}
Comment thread
Pratham-Mishra04 marked this conversation as resolved.

data, err := os.ReadFile(h.configPath)
if err != nil {
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("failed to read config file: %v", err), h.logger)
return
}

if err := json.Unmarshal(data, &config); err != nil {
SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("failed to parse config file: %v", err), h.logger)
return
}

if config.BifrostSettings.DropExcessRequests != nil {
h.client.UpdateDropExcessRequests(*config.BifrostSettings.DropExcessRequests)
}

ctx.SetStatusCode(fasthttp.StatusOK)
SendJSON(ctx, map[string]interface{}{"status": "config reloaded", "drop_excess_requests": config.BifrostSettings.DropExcessRequests}, h.logger)
}
54 changes: 29 additions & 25 deletions transports/bifrost-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"log"
Expand All @@ -72,48 +73,31 @@ import (

// Command line flags
var (
initialPoolSize int // Initial size of the connection pool
dropExcessRequests bool // Drop excess requests
port string // Port to run the server on
configPath string // Path to the config file
pluginsToLoad []string // Path to the plugins
prometheusLabels []string // Labels to add to Prometheus metrics (optional)
initialPoolSize int // Initial size of the connection pool
port string // Port to run the server on
configPath string // Path to the config file
pluginsToLoad []string // Path to the plugins
)

// init initializes command line flags and validates required configuration.
// It sets up the following flags:
// - pool-size: Initial connection pool size (default: 300)
// - port: Server port (default: 8080)
// - config: Path to config file (required)
// - drop-excess-requests: Whether to drop excess requests
func init() {
pluginString := ""
var prometheusLabelsString string

flag.IntVar(&initialPoolSize, "pool-size", 300, "Initial pool size for Bifrost")
flag.StringVar(&port, "port", "8080", "Port to run the server on")
flag.StringVar(&configPath, "config", "", "Path to the config file")
flag.BoolVar(&dropExcessRequests, "drop-excess-requests", false, "Drop excess requests")
flag.StringVar(&pluginString, "plugins", "", "Comma separated list of plugins to load")
flag.StringVar(&prometheusLabelsString, "prometheus-labels", "", "Labels to add to Prometheus metrics")
flag.Parse()

pluginsToLoad = strings.Split(pluginString, ",")

if configPath == "" {
log.Fatalf("config path is required")
}

if prometheusLabelsString != "" {
// Split and filter out empty strings
rawLabels := strings.Split(prometheusLabelsString, ",")
prometheusLabels = make([]string, 0, len(rawLabels))
for _, label := range rawLabels {
if trimmed := strings.TrimSpace(label); trimmed != "" {
prometheusLabels = append(prometheusLabels, strings.ToLower(trimmed))
}
}
}
}

// registerCollectorSafely attempts to register a Prometheus collector,
Expand Down Expand Up @@ -144,11 +128,29 @@ func main() {
registerCollectorSafely(collectors.NewGoCollector())
registerCollectorSafely(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))

tracking.InitPrometheusMetrics(prometheusLabels)
logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo)

log.Println("Prometheus Go/Process collectors registered.")
// Define a struct to unmarshal the entire config file
var config struct {
Client struct {
DropExcessRequests bool `json:"drop_excess_requests"`
PrometheusLabels []string `json:"prometheus_labels"`
} `json:"client"`
Providers json.RawMessage `json:"providers"`
MCP *schemas.MCPConfig `json:"mcp"`
}

logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo)
// Read and parse config
data, err := os.ReadFile(configPath)
if err != nil {
log.Fatalf("failed to read config file: %v", err)
}
if err := json.Unmarshal(data, &config); err != nil {
log.Fatalf("failed to parse config JSON: %v", err)
}

tracking.InitPrometheusMetrics(config.Client.PrometheusLabels)
log.Println("Prometheus Go/Process collectors registered.")

// Initialize high-performance configuration store with caching
store, err := lib.NewConfigStore(logger)
Expand Down Expand Up @@ -200,7 +202,7 @@ func main() {
client, err := bifrost.Init(schemas.BifrostConfig{
Account: account,
InitialPoolSize: initialPoolSize,
DropExcessRequests: dropExcessRequests,
DropExcessRequests: config.Client.DropExcessRequests,
Plugins: loadedPlugins,
MCPConfig: mcpConfig,
Logger: logger,
Expand All @@ -214,6 +216,7 @@ func main() {
completionHandler := handlers.NewCompletionHandler(client, logger)
mcpHandler := handlers.NewMCPHandler(client, logger)
integrationHandler := handlers.NewIntegrationHandler(client)
configHandler := handlers.NewConfigHandler(client, logger, configPath)

r := router.New()

Expand All @@ -222,6 +225,7 @@ func main() {
completionHandler.RegisterRoutes(r)
mcpHandler.RegisterRoutes(r)
integrationHandler.RegisterRoutes(r)
configHandler.RegisterRoutes(r)

// Add Prometheus /metrics endpoint
r.GET("/metrics", fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler()))
Expand Down
4 changes: 4 additions & 0 deletions transports/config.example.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"client": {
"drop_excess_requests": false,
"prometheus_labels": ["model", "provider"]
},
"providers": {
"openai": {
"keys": [
Expand Down