diff --git a/core/bifrost.go b/core/bifrost.go index 9eeaac54be..0777d8f61e 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -10,6 +10,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/maximhq/bifrost/core/providers" @@ -57,9 +58,9 @@ type Bifrost struct { responseChannelPool sync.Pool // Pool for response channels, initial pool size is set in Init errorChannelPool sync.Pool // Pool for error channels, initial pool size is set in Init logger schemas.Logger // logger instance, default logger is used if not provided - dropExcessRequests bool // If true, in cases where the queue is full, requests will not wait for the queue to be empty and will be dropped instead. backgroundCtx context.Context // Shared background context for nil context handling mcpManager *MCPManager // MCP integration manager (nil if MCP not configured) + dropExcessRequests atomic.Bool // If true, in cases where the queue is full, requests will not wait for the queue to be empty and will be dropped instead. } // PluginPipeline encapsulates the execution of plugin PreHooks and PostHooks, tracks how many plugins ran, and manages short-circuiting and error aggregation. @@ -281,13 +282,13 @@ func Init(config schemas.BifrostConfig) (*Bifrost, error) { } bifrost := &Bifrost{ - account: config.Account, - plugins: config.Plugins, - waitGroups: make(map[schemas.ModelProvider]*sync.WaitGroup), - requestQueues: make(map[schemas.ModelProvider]chan ChannelMessage), - dropExcessRequests: config.DropExcessRequests, - backgroundCtx: context.Background(), + account: config.Account, + plugins: config.Plugins, + waitGroups: make(map[schemas.ModelProvider]*sync.WaitGroup), + requestQueues: make(map[schemas.ModelProvider]chan ChannelMessage), + backgroundCtx: context.Background(), } + bifrost.dropExcessRequests.Store(config.DropExcessRequests) // Initialize object pools bifrost.channelMessagePool = sync.Pool{ @@ -576,7 +577,7 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan } break } - + result, bifrostError = executor(provider, &req, key) if bifrostError != nil && !bifrostError.IsBifrostError { break // Don't retry client errors @@ -919,7 +920,7 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont bifrost.releaseChannelMessage(msg) return nil, newBifrostErrorFromMsg("request cancelled while waiting for queue space") default: - if bifrost.dropExcessRequests { + if bifrost.dropExcessRequests.Load() { bifrost.releaseChannelMessage(msg) bifrost.logger.Warn("Request dropped: queue is full, please increase the queue size or set dropExcessRequests to false") return nil, newBifrostErrorFromMsg("request dropped: queue is full") @@ -958,6 +959,13 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont } } +// UpdateDropExcessRequests updates the DropExcessRequests setting at runtime. +// This allows for hot-reloading of this configuration value. +func (bifrost *Bifrost) UpdateDropExcessRequests(value bool) { + bifrost.dropExcessRequests.Store(value) + bifrost.logger.Info(fmt.Sprintf("DropExcessRequests updated to: %v", value)) +} + // ExecuteMCPTool executes an MCP tool call and returns the result as a tool message. // This is the main public API for manual MCP tool execution. // diff --git a/transports/Dockerfile b/transports/Dockerfile index 703180ef1d..624ae6aff5 100644 --- a/transports/Dockerfile +++ b/transports/Dockerfile @@ -43,11 +43,9 @@ USER appuser # Environment variables with defaults ENV APP_PORT=8080 \ APP_POOL_SIZE=300 \ - APP_DROP_EXCESS_REQUESTS=false \ - APP_PLUGINS="" \ - APP_PROMETHEUS_LABELS="" + APP_PLUGINS="" EXPOSE 8080 # Direct entrypoint with environment variable expansion -ENTRYPOINT ["/bin/sh", "-c", "exec /app/main -config /app/config/config.json -port \"${APP_PORT}\" -pool-size \"${APP_POOL_SIZE}\" -drop-excess-requests \"${APP_DROP_EXCESS_REQUESTS}\" -plugins \"${APP_PLUGINS}\" -prometheus-labels \"${APP_PROMETHEUS_LABELS}\""] \ No newline at end of file +ENTRYPOINT ["/bin/sh", "-c", "exec /app/main -config /app/config/config.json -port \"${APP_PORT}\" -pool-size \"${APP_POOL_SIZE}\" -plugins \"${APP_PLUGINS}\""] \ No newline at end of file diff --git a/transports/bifrost-http/handlers/config.go b/transports/bifrost-http/handlers/config.go new file mode 100644 index 0000000000..fdfe86d855 --- /dev/null +++ b/transports/bifrost-http/handlers/config.go @@ -0,0 +1,65 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/fasthttp/router" + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/core/schemas" + "github.com/valyala/fasthttp" +) + +// ConfigHandler manages runtime configuration updates for Bifrost. +// It provides an endpoint to hot-reload settings from the configuration file. +type ConfigHandler struct { + client *bifrost.Bifrost + logger schemas.Logger + configPath string +} + +// NewConfigHandler creates a new handler for configuration management. +// It requires the Bifrost client, a logger, and the path to the config file to be reloaded. +func NewConfigHandler(client *bifrost.Bifrost, logger schemas.Logger, configPath string) *ConfigHandler { + return &ConfigHandler{ + client: client, + logger: logger, + configPath: configPath, + } +} + +// RegisterRoutes registers the configuration-related routes. +// It adds the `PUT /config` endpoint. +func (h *ConfigHandler) RegisterRoutes(r *router.Router) { + r.PUT("/config", h.handleReloadConfig) +} + +// handleReloadConfig re-reads the configuration file and applies updatable settings. +// Currently, it supports hot-reloading of the `drop_excess_requests` setting. +// Note that settings like `prometheus_labels` cannot be changed at runtime. +func (h *ConfigHandler) handleReloadConfig(ctx *fasthttp.RequestCtx) { + var config struct { + BifrostSettings struct { + DropExcessRequests *bool `json:"drop_excess_requests,omitempty"` + } `json:"bifrost_settings"` + } + + data, err := os.ReadFile(h.configPath) + if err != nil { + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("failed to read config file: %v", err), h.logger) + return + } + + if err := json.Unmarshal(data, &config); err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("failed to parse config file: %v", err), h.logger) + return + } + + if config.BifrostSettings.DropExcessRequests != nil { + h.client.UpdateDropExcessRequests(*config.BifrostSettings.DropExcessRequests) + } + + ctx.SetStatusCode(fasthttp.StatusOK) + SendJSON(ctx, map[string]interface{}{"status": "config reloaded", "drop_excess_requests": config.BifrostSettings.DropExcessRequests}, h.logger) +} diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index 304ee8d62c..d8e4829f96 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -50,6 +50,7 @@ package main import ( + "encoding/json" "flag" "fmt" "log" @@ -72,12 +73,10 @@ import ( // Command line flags var ( - initialPoolSize int // Initial size of the connection pool - dropExcessRequests bool // Drop excess requests - port string // Port to run the server on - configPath string // Path to the config file - pluginsToLoad []string // Path to the plugins - prometheusLabels []string // Labels to add to Prometheus metrics (optional) + initialPoolSize int // Initial size of the connection pool + port string // Port to run the server on + configPath string // Path to the config file + pluginsToLoad []string // Path to the plugins ) // init initializes command line flags and validates required configuration. @@ -85,17 +84,13 @@ var ( // - pool-size: Initial connection pool size (default: 300) // - port: Server port (default: 8080) // - config: Path to config file (required) -// - drop-excess-requests: Whether to drop excess requests func init() { pluginString := "" - var prometheusLabelsString string flag.IntVar(&initialPoolSize, "pool-size", 300, "Initial pool size for Bifrost") flag.StringVar(&port, "port", "8080", "Port to run the server on") flag.StringVar(&configPath, "config", "", "Path to the config file") - flag.BoolVar(&dropExcessRequests, "drop-excess-requests", false, "Drop excess requests") flag.StringVar(&pluginString, "plugins", "", "Comma separated list of plugins to load") - flag.StringVar(&prometheusLabelsString, "prometheus-labels", "", "Labels to add to Prometheus metrics") flag.Parse() pluginsToLoad = strings.Split(pluginString, ",") @@ -103,17 +98,6 @@ func init() { if configPath == "" { log.Fatalf("config path is required") } - - if prometheusLabelsString != "" { - // Split and filter out empty strings - rawLabels := strings.Split(prometheusLabelsString, ",") - prometheusLabels = make([]string, 0, len(rawLabels)) - for _, label := range rawLabels { - if trimmed := strings.TrimSpace(label); trimmed != "" { - prometheusLabels = append(prometheusLabels, strings.ToLower(trimmed)) - } - } - } } // registerCollectorSafely attempts to register a Prometheus collector, @@ -144,11 +128,29 @@ func main() { registerCollectorSafely(collectors.NewGoCollector()) registerCollectorSafely(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - tracking.InitPrometheusMetrics(prometheusLabels) + logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo) - log.Println("Prometheus Go/Process collectors registered.") + // Define a struct to unmarshal the entire config file + var config struct { + Client struct { + DropExcessRequests bool `json:"drop_excess_requests"` + PrometheusLabels []string `json:"prometheus_labels"` + } `json:"client"` + Providers json.RawMessage `json:"providers"` + MCP *schemas.MCPConfig `json:"mcp"` + } - logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo) + // Read and parse config + data, err := os.ReadFile(configPath) + if err != nil { + log.Fatalf("failed to read config file: %v", err) + } + if err := json.Unmarshal(data, &config); err != nil { + log.Fatalf("failed to parse config JSON: %v", err) + } + + tracking.InitPrometheusMetrics(config.Client.PrometheusLabels) + log.Println("Prometheus Go/Process collectors registered.") // Initialize high-performance configuration store with caching store, err := lib.NewConfigStore(logger) @@ -200,7 +202,7 @@ func main() { client, err := bifrost.Init(schemas.BifrostConfig{ Account: account, InitialPoolSize: initialPoolSize, - DropExcessRequests: dropExcessRequests, + DropExcessRequests: config.Client.DropExcessRequests, Plugins: loadedPlugins, MCPConfig: mcpConfig, Logger: logger, @@ -214,6 +216,7 @@ func main() { completionHandler := handlers.NewCompletionHandler(client, logger) mcpHandler := handlers.NewMCPHandler(client, logger) integrationHandler := handlers.NewIntegrationHandler(client) + configHandler := handlers.NewConfigHandler(client, logger, configPath) r := router.New() @@ -222,6 +225,7 @@ func main() { completionHandler.RegisterRoutes(r) mcpHandler.RegisterRoutes(r) integrationHandler.RegisterRoutes(r) + configHandler.RegisterRoutes(r) // Add Prometheus /metrics endpoint r.GET("/metrics", fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler())) diff --git a/transports/config.example.json b/transports/config.example.json index ffc7152303..51967ff6e5 100644 --- a/transports/config.example.json +++ b/transports/config.example.json @@ -1,4 +1,8 @@ { + "client": { + "drop_excess_requests": false, + "prometheus_labels": ["model", "provider"] + }, "providers": { "openai": { "keys": [